diff --git a/Cargo.toml b/Cargo.toml index bc67b40c..3cde6fe7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ fxhash = "0.2" nix = "0.18.0" crossbeam = "0.7" thiserror = "1.0" +futures = { version = "0.3", features = ["thread-pool", "compat"] } [dev-dependencies] raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = ["protobuf-codec"] } diff --git a/src/cache_evict.rs b/src/cache_evict.rs index 1dede74d..22fa56ac 100644 --- a/src/cache_evict.rs +++ b/src/cache_evict.rs @@ -64,13 +64,8 @@ impl CacheSubmitor { self.block_on_full = false; } - pub fn get_cache_tracker( - &mut self, - file_num: u64, - offset: u64, - size: usize, - ) -> Option { - if self.cache_limit == 0 || size == 0 { + pub fn get_cache_tracker(&mut self, file_num: u64, offset: u64) -> Option { + if self.cache_limit == 0 { return None; } @@ -109,14 +104,17 @@ impl CacheSubmitor { } } - self.chunk_size += size; - self.size_tracker.fetch_add(size, Ordering::Release); Some(CacheTracker::new( self.global_stats.clone(), self.size_tracker.clone(), )) } + pub fn fill_chunk(&mut self, size: usize) { + self.chunk_size += size; + self.size_tracker.fetch_add(size, Ordering::Release); + } + fn reset(&mut self, file_num: u64, offset: u64) { self.file_num = file_num; self.offset = offset; diff --git a/src/engine.rs b/src/engine.rs index 338b81b1..e78dc8ae 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,8 +1,13 @@ use std::io::BufRead; +use std::sync::mpsc::{channel, Sender}; use std::sync::{Arc, RwLock}; +use std::thread::{Builder as ThreadBuilder, JoinHandle}; use std::time::{Duration, Instant}; use std::{fmt, u64}; +use futures::channel as future_channel; +use futures::executor::block_on; + use log::{info, warn}; use protobuf::Message; @@ -10,15 +15,19 @@ use crate::cache_evict::{ CacheSubmitor, CacheTask, Runner as CacheEvictRunner, DEFAULT_CACHE_CHUNK_SIZE, }; use crate::config::{Config, RecoveryMode}; +use crate::errors::Error; use crate::log_batch::{ - self, Command, CompressionType, EntryExt, LogBatch, LogItemContent, OpType, CHECKSUM_LEN, - HEADER_LEN, + self, Command, CompressionType, EntryExt, LogBatch, LogItem, LogItemContent, OpType, + CHECKSUM_LEN, HEADER_LEN, }; use crate::memtable::{EntryIndex, MemTable}; use crate::pipe_log::{GenericPipeLog, LogQueue, PipeLog, FILE_MAGIC_HEADER, VERSION}; -use crate::purge::PurgeManager; -use crate::util::{HandyRwLock, HashMap, Worker}; +use crate::purge::{PurgeManager, RemovedMemtables}; +use crate::util::{HandyRwLock, HashMap, Statistic, Worker}; +use crate::wal::{LogMsg, WalRunner, WriteTask}; use crate::{codec, CacheStats, GlobalStats, Result}; +use futures::future::{err, ok, BoxFuture}; +use std::sync::atomic::Ordering; const SLOTS_COUNT: usize = 128; @@ -121,10 +130,53 @@ where pipe_log: P, global_stats: Arc, purge_manager: PurgeManager, + wal_sender: Sender, workers: Arc>, } +fn apply_item( + memtables: &MemTableAccessor, + removed_memtables: &RemovedMemtables, + item: LogItem, + queue: LogQueue, + file_num: u64, +) where + E: Message + Clone, + W: EntryExt, +{ + let memtable = memtables.get_or_insert(item.raft_group_id); + match item.content { + LogItemContent::Entries(entries_to_add) => { + let entries = entries_to_add.entries; + let entries_index = entries_to_add.entries_index; + if queue == LogQueue::Rewrite { + memtable.wl().append_rewrite(entries, entries_index); + } else { + memtable.wl().append(entries, entries_index); + } + } + LogItemContent::Command(Command::Clean) => { + if memtables.remove(item.raft_group_id).is_some() { + removed_memtables.remove_memtable(file_num, item.raft_group_id); + } + } + LogItemContent::Command(Command::Compact { index }) => { + memtable.wl().compact_to(index); + } + LogItemContent::Kv(kv) => match kv.op_type { + OpType::Put => { + let (key, value) = (kv.key, kv.value.unwrap()); + match queue { + LogQueue::Append => memtable.wl().put(key, value, file_num), + LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num), + } + } + OpType::Del => memtable.wl().delete(kv.key.as_slice()), + }, + } +} + impl Engine where E: Message + Clone, @@ -132,49 +184,58 @@ where P: GenericPipeLog, { fn apply_to_memtable(&self, log_batch: &mut LogBatch, queue: LogQueue, file_num: u64) { + let removed = self.purge_manager.get_removed_memtables(); for item in log_batch.items.drain(..) { - let memtable = self.memtables.get_or_insert(item.raft_group_id); - match item.content { - LogItemContent::Entries(entries_to_add) => { - let entries = entries_to_add.entries; - let entries_index = entries_to_add.entries_index.into_inner(); - if queue == LogQueue::Rewrite { - memtable.wl().append_rewrite(entries, entries_index); - } else { - memtable.wl().append(entries, entries_index); - } - } - LogItemContent::Command(Command::Clean) => { - if self.memtables.remove(item.raft_group_id).is_some() { - self.purge_manager - .remove_memtable(file_num, item.raft_group_id); - } - } - LogItemContent::Command(Command::Compact { index }) => { - memtable.wl().compact_to(index); - } - LogItemContent::Kv(kv) => match kv.op_type { - OpType::Put => { - let (key, value) = (kv.key, kv.value.unwrap()); - match queue { - LogQueue::Append => memtable.wl().put(key, value, file_num), - LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num), - } - } - OpType::Del => memtable.wl().delete(kv.key.as_slice()), - }, - } + apply_item(&self.memtables, &removed, item, queue, file_num); } } - fn write_impl(&self, log_batch: &mut LogBatch, sync: bool) -> Result { - let queue = LogQueue::Append; - let mut file_num = 0; - let bytes = self.pipe_log.write(log_batch, sync, &mut file_num)?; - if file_num > 0 { - self.apply_to_memtable(log_batch, queue, file_num); + fn write_impl( + &self, + log_batch: &mut LogBatch, + sync: bool, + ) -> BoxFuture<'static, Result> { + let now = Instant::now(); + if let Some(content) = log_batch.encode_to_bytes() { + let (sender, r) = future_channel::oneshot::channel(); + let bytes = content.len(); + let task = WriteTask { + content, + sync, + entries_size: log_batch.entries_size(), + sender, + }; + if let Err(_) = self.wal_sender.send(LogMsg::Write(task)) { + return Box::pin(err(Error::Stop)); + } + let memtables = self.memtables.clone(); + let items = std::mem::replace(&mut log_batch.items, vec![]); + let removed_memtables = self.purge_manager.get_removed_memtables(); + let stats = self.global_stats.clone(); + return Box::pin(async move { + let (file_num, offset, tracker) = r.await?; + // cacluate memtable cost + let t1 = now.elapsed().as_micros(); + if file_num > 0 { + for mut item in items { + if let LogItemContent::Entries(entries) = &mut item.content { + entries.update_position(LogQueue::Append, file_num, offset, &tracker); + } + apply_item( + &memtables, + &removed_memtables, + item, + LogQueue::Append, + file_num, + ); + } + } + let t2 = now.elapsed().as_micros(); + stats.add_write_duration_change((t2 - t1) as usize, t2 as usize); + Ok(bytes) + }); } - Ok(bytes) + return Box::pin(ok(0)); } } @@ -191,6 +252,7 @@ where struct Workers { cache_evict: Worker, + wal: Option>, } impl Engine @@ -208,16 +270,7 @@ where let mut cache_evict_worker = Worker::new("cache_evict".to_owned(), None); - let pipe_log = PipeLog::open( - &cfg, - CacheSubmitor::new( - cache_limit, - chunk_limit, - cache_evict_worker.scheduler(), - global_stats.clone(), - ), - ) - .expect("Open raft log"); + let pipe_log = PipeLog::open(&cfg).expect("Open raft log"); let memtables = { let stats = global_stats.clone(); @@ -233,6 +286,13 @@ where ); cache_evict_worker.start(cache_evict_runner, Some(Duration::from_secs(1))); + let mut cache_submitor = CacheSubmitor::new( + cache_limit, + chunk_limit, + cache_evict_worker.scheduler(), + global_stats.clone(), + ); + let cfg = Arc::new(cfg); let purge_manager = PurgeManager::new( cfg.clone(), @@ -240,31 +300,52 @@ where pipe_log.clone(), global_stats.clone(), ); - + let (wal_sender, wal_receiver) = channel(); let engine = Engine { cfg, memtables, pipe_log, global_stats, purge_manager, + wal_sender, workers: Arc::new(RwLock::new(Workers { cache_evict: cache_evict_worker, + wal: None, })), }; - engine.pipe_log.cache_submitor().block_on_full(); + cache_submitor.block_on_full(); engine.recover( + &mut cache_submitor, LogQueue::Rewrite, RecoveryMode::TolerateCorruptedTailRecords, )?; - engine.recover(LogQueue::Append, engine.cfg.recovery_mode)?; - engine.pipe_log.cache_submitor().nonblock_on_full(); - + engine.recover( + &mut cache_submitor, + LogQueue::Append, + engine.cfg.recovery_mode, + )?; + cache_submitor.nonblock_on_full(); + + let mut wal_runner = WalRunner::new(cache_submitor, engine.pipe_log.clone(), wal_receiver); + let th = ThreadBuilder::new() + .name("wal".to_string()) + .spawn(move || { + wal_runner + .run() + .unwrap_or_else(|e| warn!("run error because: {}", e)) + }) + .unwrap(); + engine.workers.wl().wal = Some(th); Ok(engine) } - // Recover from disk. - fn recover(&self, queue: LogQueue, recovery_mode: RecoveryMode) -> Result<()> { + fn recover( + &self, + cache_submitor: &mut CacheSubmitor, + queue: LogQueue, + recovery_mode: RecoveryMode, + ) -> Result<()> { // Get first file number and last file number. let first_file_num = self.pipe_log.first_file_num(queue); let active_file_num = self.pipe_log.active_file_num(queue); @@ -292,22 +373,20 @@ where buf.consume(FILE_MAGIC_HEADER.len() + VERSION.len()); let mut offset = (FILE_MAGIC_HEADER.len() + VERSION.len()) as u64; loop { - match LogBatch::from_bytes(&mut buf, file_num, offset) { + match LogBatch::::from_bytes(&mut buf, file_num, offset) { Ok(Some(mut log_batch)) => { if queue == LogQueue::Append { - if let Some(tracker) = self.pipe_log.cache_submitor().get_cache_tracker( - file_num, - offset, - log_batch.entries_size(), - ) { - for item in &log_batch.items { - if let LogItemContent::Entries(ref entries) = item.content { + if let Some(tracker) = + cache_submitor.get_cache_tracker(file_num, offset) + { + for item in log_batch.items.iter_mut() { + if let LogItemContent::Entries(entries) = &mut item.content { entries.attach_cache_tracker(tracker.clone()); } } + cache_submitor.fill_chunk(log_batch.entries_size()); } } - self.apply_to_memtable(&mut log_batch, queue, file_num); offset = (buf.as_ptr() as usize - start_ptr as usize) as u64; } @@ -463,7 +542,15 @@ where /// Write the content of LogBatch into the engine and return written bytes. /// If set sync true, the data will be persisted on disk by `fsync`. pub fn write(&self, log_batch: &mut LogBatch, sync: bool) -> Result { - self.write_impl(log_batch, sync) + block_on(self.write_impl(log_batch, sync)) + } + + pub fn async_write( + &self, + mut log_batch: LogBatch, + sync: bool, + ) -> BoxFuture<'static, Result> { + self.write_impl(&mut log_batch, sync) } /// Flush stats about EntryCache. @@ -474,7 +561,43 @@ where /// Stop background thread which will keep trying evict caching. pub fn stop(&self) { let mut workers = self.workers.wl(); + self.wal_sender.send(LogMsg::Stop).unwrap(); workers.cache_evict.stop(); + if let Some(wal) = workers.wal.take() { + wal.join().unwrap(); + } + } + + pub fn async_get_metric(&self) -> BoxFuture<'static, Result> { + let (sender, r) = future_channel::oneshot::channel(); + if let Err(_) = self.wal_sender.send(LogMsg::Metric(sender)) { + return Box::pin(err(Error::Stop)); + } + let write_count = self.global_stats.write_count.load(Ordering::Relaxed); + let write_cost = self.global_stats.write_cost.load(Ordering::Relaxed); + let mem_cost = self.global_stats.mem_cost.load(Ordering::Relaxed); + let max_write_cost = self.global_stats.max_write_cost.load(Ordering::Relaxed); + let max_mem_cost = self.global_stats.max_mem_cost.load(Ordering::Relaxed); + self.global_stats + .write_count + .fetch_sub(write_count, Ordering::Relaxed); + self.global_stats + .write_cost + .fetch_sub(write_cost, Ordering::Relaxed); + self.global_stats + .mem_cost + .fetch_sub(mem_cost, Ordering::Relaxed); + return Box::pin(async move { + let mut stats = r.await?; + // transfer micro to sec + if write_count > 0 { + stats.avg_write_cost = write_cost / write_count; + stats.avg_mem_cost = mem_cost / write_count; + } + stats.max_write_cost = max_write_cost; + stats.max_mem_cost = max_mem_cost; + Ok(stats) + }); } } diff --git a/src/errors.rs b/src/errors.rs index 8313149f..61d0420e 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,3 +1,4 @@ +use futures::channel::oneshot::Canceled; use std::error; use std::io::Error as IoError; @@ -27,6 +28,18 @@ pub enum Error { StorageUnavailable, #[error("The entry acquired has been compacted")] StorageCompacted, + #[error("write wal failed")] + Wal, + #[error("wal-thread has stopped")] + Stop, + #[error("wal-channel has full")] + Full, +} + +impl From for Error { + fn from(_: Canceled) -> Error { + Error::Wal + } } pub type Result = ::std::result::Result; diff --git a/src/lib.rs b/src/lib.rs index 8ca5004c..1720c23d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,12 +25,14 @@ mod memtable; mod pipe_log; mod purge; mod util; +mod wal; use crate::pipe_log::PipeLog; pub use self::config::{Config, RecoveryMode}; pub use self::errors::{Error, Result}; pub use self::log_batch::{EntryExt, LogBatch}; +pub use self::util::Statistic; pub type RaftLogEngine = self::engine::Engine; #[derive(Clone, Copy, Default)] @@ -50,6 +52,11 @@ pub struct GlobalStats { rewrite_operations: AtomicUsize, // How many compacted operations in the rewrite queue. compacted_rewrite_operations: AtomicUsize, + write_count: AtomicUsize, + write_cost: AtomicUsize, + max_write_cost: AtomicUsize, + mem_cost: AtomicUsize, + max_mem_cost: AtomicUsize, } impl GlobalStats { @@ -65,6 +72,19 @@ impl GlobalStats { pub fn add_cache_miss(&self, count: usize) { self.cache_miss.fetch_add(count, Ordering::Relaxed); } + pub fn add_write_duration_change(&self, memtable_duration: usize, write_duration: usize) { + self.write_count.fetch_add(1, Ordering::Relaxed); + self.write_cost.fetch_add(write_duration, Ordering::Relaxed); + self.mem_cost + .fetch_add(memtable_duration, Ordering::Relaxed); + if write_duration > self.max_write_cost.load(Ordering::Relaxed) { + self.max_write_cost.store(write_duration, Ordering::Relaxed); + } + if memtable_duration > self.max_mem_cost.load(Ordering::Relaxed) { + self.max_write_cost + .store(memtable_duration, Ordering::Relaxed); + } + } pub fn cache_hit(&self) -> usize { self.cache_hit.load(Ordering::Relaxed) diff --git a/src/log_batch.rs b/src/log_batch.rs index 62067d11..a16d695a 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -1,5 +1,4 @@ use std::borrow::{Borrow, Cow}; -use std::cell::RefCell; use std::io::BufRead; use std::marker::PhantomData; use std::{mem, u64}; @@ -129,7 +128,7 @@ where { pub entries: Vec, // EntryIndex may be update after write to file. - pub entries_index: RefCell>, + pub entries_index: Vec, } impl PartialEq for Entries { @@ -140,9 +139,8 @@ impl PartialEq for Entries { impl Entries { pub fn new(entries: Vec, entries_index: Option>) -> Entries { - let entries_index = RefCell::new( - entries_index.unwrap_or_else(|| vec![EntryIndex::default(); entries.len()]), - ); + let entries_index = + entries_index.unwrap_or_else(|| vec![EntryIndex::default(); entries.len()]); Entries { entries, entries_index, @@ -182,7 +180,7 @@ impl Entries { Ok(Entries::new(entries, Some(entries_index))) } - pub fn encode_to(&self, vec: &mut Vec, entries_size: &mut usize) -> Result<()> + pub fn encode_to(&mut self, vec: &mut Vec, entries_size: &mut usize) -> Result<()> where W: EntryExt, { @@ -199,13 +197,12 @@ impl Entries { vec.encode_var_u64(content.len() as u64)?; // file_num = 0 means entry index is not initialized. - let mut entries_index = self.entries_index.borrow_mut(); - if entries_index[i].file_num == 0 { - entries_index[i].index = W::index(&e); + if self.entries_index[i].file_num == 0 { + self.entries_index[i].index = W::index(&e); // This offset doesn't count the header. - entries_index[i].offset = vec.len() as u64; - entries_index[i].len = content.len() as u64; - *entries_size += entries_index[i].len as usize; + self.entries_index[i].offset = vec.len() as u64; + self.entries_index[i].len = content.len() as u64; + *entries_size += self.entries_index[i].len as usize; } vec.extend_from_slice(&content); @@ -214,13 +211,13 @@ impl Entries { } pub fn update_position( - &self, + &mut self, queue: LogQueue, file_num: u64, base: u64, tracker: &Option, ) { - for idx in self.entries_index.borrow_mut().iter_mut() { + for idx in self.entries_index.iter_mut() { debug_assert!(idx.file_num == 0 && idx.base_offset == 0); debug_assert!(idx.cache_tracker.is_none()); idx.queue = queue; @@ -235,8 +232,8 @@ impl Entries { } } - pub fn attach_cache_tracker(&self, tracker: CacheTracker) { - for idx in self.entries_index.borrow_mut().iter_mut() { + pub fn attach_cache_tracker(&mut self, tracker: CacheTracker) { + for idx in self.entries_index.iter_mut() { let mut tkr = tracker.clone(); tkr.global_stats.add_mem_change(idx.len as usize); tkr.sub_on_drop = idx.len as usize; @@ -244,8 +241,8 @@ impl Entries { } } - fn update_compression_type(&self, compression_type: CompressionType, batch_len: u64) { - for idx in self.entries_index.borrow_mut().iter_mut() { + fn update_compression_type(&mut self, compression_type: CompressionType, batch_len: u64) { + for idx in self.entries_index.iter_mut() { idx.compression_type = compression_type; idx.batch_len = batch_len; } @@ -399,22 +396,22 @@ impl LogItem { } } - pub fn encode_to(&self, vec: &mut Vec, entries_size: &mut usize) -> Result<()> + pub fn encode_to(&mut self, vec: &mut Vec, entries_size: &mut usize) -> Result<()> where W: EntryExt, { // layout = { 8 byte id | 1 byte type | item layout } vec.encode_var_u64(self.raft_group_id)?; - match self.content { - LogItemContent::Entries(ref entries) => { + match &mut self.content { + LogItemContent::Entries(entries) => { vec.push(TYPE_ENTRIES); entries.encode_to::(vec, entries_size)?; } - LogItemContent::Command(ref command) => { + LogItemContent::Command(command) => { vec.push(TYPE_COMMAND); command.encode_to(vec); } - LogItemContent::Kv(ref kv) => { + LogItemContent::Kv(kv) => { vec.push(TYPE_KV); kv.encode_to(vec)?; } @@ -469,7 +466,7 @@ where W: EntryExt, { pub items: Vec>, - entries_size: RefCell, + entries_size: usize, _phantom: PhantomData, } @@ -481,7 +478,7 @@ where fn default() -> Self { Self { items: Vec::with_capacity(16), - entries_size: RefCell::new(0), + entries_size: 0, _phantom: PhantomData, } } @@ -499,7 +496,7 @@ where pub fn with_capacity(cap: usize) -> Self { Self { items: Vec::with_capacity(cap), - entries_size: RefCell::new(0), + entries_size: 0, _phantom: PhantomData, } } @@ -579,7 +576,7 @@ where file_num, base_offset, content_offset, - &mut log_batch.entries_size.borrow_mut(), + &mut log_batch.entries_size, )?; log_batch.items.push(item); items_count -= 1; @@ -587,8 +584,8 @@ where assert!(reader.is_empty()); buf.consume(batch_len); - for item in &log_batch.items { - if let LogItemContent::Entries(ref entries) = item.content { + for item in log_batch.items.iter_mut() { + if let LogItemContent::Entries(entries) = &mut item.content { entries.update_compression_type(batch_type, batch_len as u64); } } @@ -597,7 +594,7 @@ where } // TODO: avoid to write a large batch into one compressed chunk. - pub fn encode_to_bytes(&self) -> Option> { + pub fn encode_to_bytes(&mut self) -> Option> { if self.items.is_empty() { return None; } @@ -606,8 +603,8 @@ where let mut vec = Vec::with_capacity(4096); vec.encode_u64(0).unwrap(); vec.encode_var_u64(self.items.len() as u64).unwrap(); - for item in &self.items { - item.encode_to::(&mut vec, &mut *self.entries_size.borrow_mut()) + for item in self.items.iter_mut() { + item.encode_to::(&mut vec, &mut self.entries_size) .unwrap(); } @@ -626,8 +623,8 @@ where vec.as_mut_slice().write_u64::(header).unwrap(); let batch_len = (vec.len() - 8) as u64; - for item in &self.items { - if let LogItemContent::Entries(ref entries) = item.content { + for item in self.items.iter_mut() { + if let LogItemContent::Entries(entries) = &mut item.content { entries.update_compression_type(compression_type, batch_len as u64); } } @@ -670,13 +667,13 @@ mod tests { fn test_entries_enc_dec() { let pb_entries = vec![Entry::new(); 10]; let file_num = 1; - let entries = Entries::new(pb_entries, None); + let mut entries = Entries::new(pb_entries, None); let (mut encoded, mut entries_size1) = (vec![], 0); entries .encode_to::(&mut encoded, &mut entries_size1) .unwrap(); - for idx in entries.entries_index.borrow_mut().iter_mut() { + for idx in entries.entries_index.iter_mut() { idx.file_num = file_num; } let (mut s, mut entries_size2) = (encoded.as_slice(), 0); @@ -723,8 +720,8 @@ mod tests { ), ]; - for item in items { - let (mut encoded, mut entries_size1) = (vec![], 0); + for mut item in items { + let mut encoded = vec![]; item.encode_to::(&mut encoded, &mut entries_size1) .unwrap(); let (mut s, mut entries_size2) = (encoded.as_slice(), 0); diff --git a/src/memtable.rs b/src/memtable.rs index 7e05711d..ead5f0a0 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -466,6 +466,7 @@ impl> MemTable { vec_idx.extend_from_slice(first); vec_idx.extend_from_slice(second); } + let (hit, miss) = (vec.len() - vec_len, vec_idx.len() - vec_idx_len); self.global_stats.add_cache_hit(hit); self.global_stats.add_cache_miss(miss); diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 13b7d5b6..ddf05493 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -3,7 +3,7 @@ use std::fs::{self, File}; use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read}; use std::os::unix::io::RawFd; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::{cmp, u64}; use log::{info, warn}; @@ -15,7 +15,6 @@ use nix::unistd::{close, fsync, ftruncate, lseek, Whence}; use nix::NixPath; use protobuf::Message; -use crate::cache_evict::CacheSubmitor; use crate::config::Config; use crate::log_batch::{EntryExt, LogBatch, LogItemContent}; use crate::util::HandyRwLock; @@ -43,29 +42,19 @@ pub trait GenericPipeLog: Sized + Clone + Send { /// Read some bytes from the given position. fn fread(&self, queue: LogQueue, file_num: u64, offset: u64, len: u64) -> Result>; + /// Check whether the size of current write log has reached the rotate limit. + fn switch_log_file(&self, queue: LogQueue) -> Result<(u64, Arc)>; + /// Append some bytes to the given queue. - fn append( - &self, - queue: LogQueue, - content: &[u8], - sync: &mut bool, - ) -> Result<(u64, u64, Arc)>; + fn append(&self, queue: LogQueue, content: &[u8]) -> Result; /// Close the pipe log. fn close(&self) -> Result<()>; - /// Write a batch into the append queue. - fn write>( - &self, - batch: &LogBatch, - sync: bool, - file_num: &mut u64, - ) -> Result; - /// Rewrite a batch into the rewrite queue. fn rewrite>( &self, - batch: &LogBatch, + batch: &mut LogBatch, sync: bool, file_num: &mut u64, ) -> Result; @@ -97,8 +86,6 @@ pub trait GenericPipeLog: Sized + Clone + Send { fn latest_file_before(&self, queue: LogQueue, size: usize) -> u64; fn file_len(&self, queue: LogQueue, file_num: u64) -> Result; - - fn cache_submitor(&self) -> MutexGuard; } pub struct LogFd(RawFd); @@ -107,6 +94,9 @@ impl LogFd { fn close(&self) -> Result<()> { close(self.0).map_err(|e| parse_nix_error(e, "close")) } + pub fn sync(&self) -> Result<()> { + fsync(self.0).map_err(|e| parse_nix_error(e, "fsync")) + } } impl Drop for LogFd { @@ -119,8 +109,6 @@ impl Drop for LogFd { struct LogManager { dir: String, - rotate_size: usize, - bytes_per_sync: usize, name_suffix: &'static str, pub first_file_num: u64, @@ -137,8 +125,6 @@ impl LogManager { fn new(cfg: &Config, name_suffix: &'static str) -> Self { Self { dir: cfg.dir.clone(), - rotate_size: cfg.target_file_size.0 as usize, - bytes_per_sync: cfg.bytes_per_sync.0 as usize, name_suffix, first_file_num: INIT_FILE_NUM, @@ -170,7 +156,7 @@ impl LogManager { let mut path = PathBuf::from(&self.dir); path.push(logs.last().unwrap()); let fd = Arc::new(LogFd(open_active_file(&path)?)); - fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; + fd.sync()?; self.active_log_size = file_len(fd.0)?; self.active_log_capacity = self.active_log_size; @@ -184,7 +170,9 @@ impl LogManager { if self.active_file_num > 0 { self.truncate_active_log(None)?; } - + if let Some(last_file) = self.all_files.back() { + last_file.sync()?; + } self.active_file_num += 1; let mut path = PathBuf::from(&self.dir); @@ -215,7 +203,7 @@ impl LogManager { // After truncate to 0, write header is necessary. offset = write_file_header(active_fd.0)?; } - fsync(active_fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; + active_fd.sync()?; self.active_log_size = offset; self.active_log_capacity = offset; self.last_sync_size = self.active_log_size; @@ -246,16 +234,7 @@ impl LogManager { Ok(end_offset) } - fn reach_sync_limit(&self) -> bool { - self.active_log_size - self.last_sync_size >= self.bytes_per_sync - } - - fn on_append(&mut self, content_len: usize, sync: &mut bool) -> Result<(u64, u64, Arc)> { - if self.active_log_size >= self.rotate_size { - self.new_log_file()?; - } - - let active_file_num = self.active_file_num; + fn on_append(&mut self, content_len: usize) -> Result<(u64, Arc)> { let active_log_size = self.active_log_size; let fd = self.get_active_fd().unwrap(); @@ -276,11 +255,7 @@ impl LogManager { self.active_log_capacity += alloc_size; } - if *sync || self.reach_sync_limit() { - self.last_sync_size = self.active_log_size; - *sync = true; - } - Ok((active_file_num, active_log_size as u64, fd)) + Ok((active_log_size as u64, fd)) } } @@ -298,11 +273,10 @@ pub struct PipeLog { appender: Arc>, rewriter: Arc>, - cache_submitor: Arc>, } impl PipeLog { - fn new(cfg: &Config, cache_submitor: CacheSubmitor) -> PipeLog { + fn new(cfg: &Config) -> PipeLog { let appender = Arc::new(RwLock::new(LogManager::new(&cfg, LOG_SUFFIX))); let rewriter = Arc::new(RwLock::new(LogManager::new(&cfg, REWRITE_SUFFIX))); PipeLog { @@ -311,11 +285,10 @@ impl PipeLog { bytes_per_sync: cfg.bytes_per_sync.0 as usize, appender, rewriter, - cache_submitor: Arc::new(Mutex::new(cache_submitor)), } } - pub fn open(cfg: &Config, cache_submitor: CacheSubmitor) -> Result { + pub fn open(cfg: &Config) -> Result { let path = Path::new(&cfg.dir); if !path.exists() { info!("Create raft log directory: {}", &cfg.dir); @@ -325,7 +298,7 @@ impl PipeLog { return Err(box_err!("Not directory: {}", &cfg.dir)); } - let pipe_log = PipeLog::new(cfg, cache_submitor); + let pipe_log = PipeLog::new(cfg); let (mut min_file_num, mut max_file_num): (u64, u64) = (u64::MAX, 0); let (mut min_rewrite_num, mut max_rewrite_num): (u64, u64) = (u64::MAX, 0); @@ -425,66 +398,42 @@ impl GenericPipeLog for PipeLog { pread_exact(fd.0, offset, len as usize) } - fn append( - &self, - queue: LogQueue, - content: &[u8], - sync: &mut bool, - ) -> Result<(u64, u64, Arc)> { - let (file_num, offset, fd) = self.mut_queue(queue).on_append(content.len(), sync)?; + fn switch_log_file(&self, queue: LogQueue) -> Result<(u64, Arc)> { + let mut writer = self.mut_queue(queue); + if writer.active_log_size >= self.rotate_size { + writer.new_log_file()?; + } + let fd = writer.get_active_fd().unwrap(); + + Ok((writer.active_file_num, fd)) + } + + fn append(&self, queue: LogQueue, content: &[u8]) -> Result { + let (offset, fd) = self.mut_queue(queue).on_append(content.len())?; pwrite_exact(fd.0, offset, content)?; - Ok((file_num, offset, fd)) + Ok(offset) } fn close(&self) -> Result<()> { - let _write_lock = self.cache_submitor.lock().unwrap(); self.appender.wl().truncate_active_log(None)?; self.rewriter.wl().truncate_active_log(None)?; Ok(()) } - fn write>( - &self, - batch: &LogBatch, - mut sync: bool, - file_num: &mut u64, - ) -> Result { - if let Some(content) = batch.encode_to_bytes() { - // TODO: `pwrite` is performed in the mutex. Is it possible for concurrence? - let mut cache_submitor = self.cache_submitor.lock().unwrap(); - let (cur_file_num, offset, fd) = self.append(LogQueue::Append, &content, &mut sync)?; - let tracker = - cache_submitor.get_cache_tracker(cur_file_num, offset, batch.entries_size()); - drop(cache_submitor); - if sync { - fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; - } - - for item in &batch.items { - if let LogItemContent::Entries(ref entries) = item.content { - entries.update_position(LogQueue::Append, cur_file_num, offset, &tracker); - } - } - - *file_num = cur_file_num; - return Ok(content.len()); - } - Ok(0) - } - fn rewrite>( &self, - batch: &LogBatch, - mut sync: bool, + batch: &mut LogBatch, + sync: bool, file_num: &mut u64, ) -> Result { if let Some(content) = batch.encode_to_bytes() { - let (cur_file_num, offset, fd) = self.append(LogQueue::Rewrite, &content, &mut sync)?; + let (cur_file_num, fd) = self.switch_log_file(LogQueue::Rewrite)?; + let offset = self.append(LogQueue::Rewrite, &content)?; if sync { - fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; + fd.sync()?; } - for item in &batch.items { - if let LogItemContent::Entries(ref entries) = item.content { + for item in batch.items.iter_mut() { + if let LogItemContent::Entries(entries) = &mut item.content { entries.update_position(LogQueue::Rewrite, cur_file_num, offset, &None); } } @@ -504,7 +453,7 @@ impl GenericPipeLog for PipeLog { fn sync(&self, queue: LogQueue) -> Result<()> { if let Some(fd) = self.get_queue(queue).get_active_fd() { - fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; + fd.sync()?; } Ok(()) } @@ -570,10 +519,6 @@ impl GenericPipeLog for PipeLog { .get_fd(file_num) .map(|fd| file_len(fd.0).unwrap() as u64) } - - fn cache_submitor(&self) -> MutexGuard { - self.cache_submitor.lock().unwrap() - } } fn generate_file_name(file_num: u64, suffix: &'static str) -> String { @@ -672,7 +617,6 @@ fn write_file_header(fd: RawFd) -> Result { mod tests { use std::time::Duration; - use crossbeam::channel::Receiver; use raft::eraftpb::Entry; use tempfile::Builder; @@ -681,21 +625,14 @@ mod tests { use crate::util::{ReadableSize, Worker}; use crate::GlobalStats; - fn new_test_pipe_log( - path: &str, - bytes_per_sync: usize, - rotate_size: usize, - ) -> (PipeLog, Receiver>) { + fn new_test_pipe_log(path: &str, bytes_per_sync: usize, rotate_size: usize) -> PipeLog { let mut cfg = Config::default(); cfg.dir = path.to_owned(); cfg.bytes_per_sync = ReadableSize(bytes_per_sync as u64); cfg.target_file_size = ReadableSize(rotate_size as u64); - let mut worker = Worker::new("test".to_owned(), None); - let stats = Arc::new(GlobalStats::default()); - let submitor = CacheSubmitor::new(usize::MAX, 4096, worker.scheduler(), stats); - let log = PipeLog::open(&cfg, submitor).unwrap(); - (log, worker.take_receiver()) + let log = PipeLog::open(&cfg).unwrap(); + log } #[test] @@ -719,7 +656,7 @@ mod tests { let rotate_size = 1024; let bytes_per_sync = 32 * 1024; - let (pipe_log, _) = new_test_pipe_log(path, bytes_per_sync, rotate_size); + let pipe_log = new_test_pipe_log(path, bytes_per_sync, rotate_size); assert_eq!(pipe_log.first_file_num(queue), INIT_FILE_NUM); assert_eq!(pipe_log.active_file_num(queue), INIT_FILE_NUM); @@ -727,12 +664,14 @@ mod tests { // generate file 1, 2, 3 let content: Vec = vec![b'a'; 1024]; - let (file_num, offset, _) = pipe_log.append(queue, &content, &mut false).unwrap(); + let (file_num, _) = pipe_log.switch_log_file(queue).unwrap(); + let offset = pipe_log.append(queue, &content).unwrap(); assert_eq!(file_num, 1); assert_eq!(offset, header_size); assert_eq!(pipe_log.active_file_num(queue), 1); - let (file_num, offset, _) = pipe_log.append(queue, &content, &mut false).unwrap(); + let (file_num, _) = pipe_log.switch_log_file(queue).unwrap(); + let offset = pipe_log.append(queue, &content).unwrap(); assert_eq!(file_num, 2); assert_eq!(offset, header_size); assert_eq!(pipe_log.active_file_num(queue), 2); @@ -746,11 +685,13 @@ mod tests { // append position let s_content = b"short content".to_vec(); - let (file_num, offset, _) = pipe_log.append(queue, &s_content, &mut false).unwrap(); + let (file_num, _) = pipe_log.switch_log_file(queue).unwrap(); + let offset = pipe_log.append(queue, &s_content).unwrap(); assert_eq!(file_num, 3); assert_eq!(offset, header_size); - let (file_num, offset, _) = pipe_log.append(queue, &s_content, &mut false).unwrap(); + let (file_num, _) = pipe_log.switch_log_file(queue).unwrap(); + let offset = pipe_log.append(queue, &s_content).unwrap(); assert_eq!(file_num, 3); assert_eq!(offset, header_size + s_content.len() as u64); @@ -797,7 +738,7 @@ mod tests { pipe_log.close().unwrap(); // reopen - let (pipe_log, _) = new_test_pipe_log(path, bytes_per_sync, rotate_size); + let pipe_log = new_test_pipe_log(path, bytes_per_sync, rotate_size); assert_eq!(pipe_log.active_file_num(queue), 3); assert_eq!( pipe_log.active_log_size(queue), @@ -819,6 +760,29 @@ mod tests { test_pipe_log_impl(LogQueue::Rewrite) } + fn write_to_log( + log: &mut PipeLog, + submitor: &mut CacheSubmitor, + batch: &mut LogBatch, + file_num: &mut u64, + ) { + let mut entries_size = 0; + if let Some(content) = batch.encode_to_bytes(&mut entries_size) { + // TODO: `pwrite` is performed in the mutex. Is it possible for concurrence? + let (cur_file_num, fd) = log.switch_log_file(LogQueue::Append).unwrap(); + let offset = log.append(LogQueue::Append, &content).unwrap(); + let tracker = submitor.get_cache_tracker(cur_file_num, offset); + submitor.fill_chunk(entries_size); + for item in &mut batch.items { + if let LogItemContent::Entries(entries) = &mut item.content { + entries.update_position(LogQueue::Append, cur_file_num, offset, &tracker); + } + } + fd.sync().unwrap(); + *file_num = cur_file_num; + } + } + #[test] fn test_cache_submitor() { let dir = Builder::new().prefix("test_pipe_log").tempdir().unwrap(); @@ -826,7 +790,11 @@ mod tests { let rotate_size = 6 * 1024; // 6K to rotate. let bytes_per_sync = 32 * 1024; - let (pipe_log, receiver) = new_test_pipe_log(path, bytes_per_sync, rotate_size); + let mut worker = Worker::new("test".to_owned(), None); + let mut pipe_log = new_test_pipe_log(path, bytes_per_sync, rotate_size); + let receiver = worker.take_receiver(); + let stats = Arc::new(GlobalStats::default()); + let mut submitor = CacheSubmitor::new(usize::MAX, 4096, worker.scheduler(), stats); let get_1m_batch = || { let mut entry = Entry::new(); @@ -844,9 +812,9 @@ mod tests { // After 4 batches are written into pipe log, no `CacheTask::NewChunk` // task should be triggered. However the last batch will trigger it. for i in 0..5 { - let log_batch = get_1m_batch(); + let mut log_batch = get_1m_batch(); let mut file_num = 0; - pipe_log.write(&log_batch, true, &mut file_num).unwrap(); + write_to_log(&mut pipe_log, &mut submitor, &mut log_batch, &mut file_num); log_batches.push(log_batch); let x = receiver.recv_timeout(Duration::from_millis(100)); if i < 4 { @@ -859,9 +827,9 @@ mod tests { // Write more 2 batches into pipe log. A `CacheTask::NewChunk` will be // emit on the second batch because log file is switched. for i in 5..7 { - let log_batch = get_1m_batch(); + let mut log_batch = get_1m_batch(); let mut file_num = 0; - pipe_log.write(&log_batch, true, &mut file_num).unwrap(); + write_to_log(&mut pipe_log, &mut submitor, &mut log_batch, &mut file_num); log_batches.push(log_batch); let x = receiver.recv_timeout(Duration::from_millis(100)); if i < 6 { @@ -875,9 +843,9 @@ mod tests { // `CacheTracker`s accociated in `EntryIndex`s are droped. drop(log_batches); for _ in 7..20 { - let log_batch = get_1m_batch(); + let mut log_batch = get_1m_batch(); let mut file_num = 0; - pipe_log.write(&log_batch, true, &mut file_num).unwrap(); + write_to_log(&mut pipe_log, &mut submitor, &mut log_batch, &mut file_num); drop(log_batch); assert!(receiver.recv_timeout(Duration::from_millis(100)).is_err()); } diff --git a/src/purge.rs b/src/purge.rs index 60bc5103..c6b56fcb 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -12,6 +12,16 @@ use crate::pipe_log::{GenericPipeLog, LogQueue}; use crate::util::HandyRwLock; use crate::{GlobalStats, Result}; +#[derive(Clone, Default)] +pub struct RemovedMemtables(Arc, u64)>>>); + +impl RemovedMemtables { + pub fn remove_memtable(&self, file_num: u64, raft_group_id: u64) { + let mut tables = self.0.lock().unwrap(); + tables.push((Reverse(file_num), raft_group_id)); + } +} + // If a region has some very old raft logs less than this threshold, // rewrite them to clean stale log files ASAP. const REWRITE_ENTRY_COUNT_THRESHOLD: usize = 32; @@ -31,8 +41,7 @@ where global_stats: Arc, // Vector of (file_num, raft_group_id). - #[allow(clippy::type_complexity)] - removed_memtables: Arc, u64)>>>, + removed_memtables: RemovedMemtables, // Only one thread can run `purge_expired_files` at a time. purge_mutex: Arc>, @@ -115,9 +124,8 @@ where } } - pub fn remove_memtable(&self, file_num: u64, raft_group_id: u64) { - let mut tables = self.removed_memtables.lock().unwrap(); - tables.push((Reverse(file_num), raft_group_id)); + pub fn get_removed_memtables(&self) -> RemovedMemtables { + self.removed_memtables.clone() } // Returns (`latest_needs_rewrite`, `latest_needs_force_compact`). @@ -159,13 +167,16 @@ where assert!(latest_compact <= latest_rewrite); let mut log_batch = LogBatch::::new(); - while let Some(item) = self.removed_memtables.lock().unwrap().pop() { - let (file_num, raft_id) = ((item.0).0, item.1); - if file_num > latest_rewrite { - self.removed_memtables.lock().unwrap().push(item); - break; + { + let mut guard = self.removed_memtables.0.lock().unwrap(); + while let Some(item) = guard.pop() { + let (file_num, raft_id) = ((item.0).0, item.1); + if file_num > latest_rewrite { + guard.push(item); + break; + } + log_batch.clean_region(raft_id); } - log_batch.clean_region(raft_id); } let memtables = self.memtables.collect(|t| { @@ -209,7 +220,7 @@ where latest_rewrite: Option, ) -> Result<()> { let mut file_num = 0; - self.pipe_log.rewrite(&log_batch, true, &mut file_num)?; + self.pipe_log.rewrite(log_batch, true, &mut file_num)?; if file_num > 0 { self.rewrite_to_memtable(log_batch, file_num, latest_rewrite); } @@ -226,7 +237,7 @@ where let memtable = self.memtables.get_or_insert(item.raft_group_id); match item.content { LogItemContent::Entries(entries_to_add) => { - let entries_index = entries_to_add.entries_index.into_inner(); + let entries_index = entries_to_add.entries_index; memtable.wl().rewrite(entries_index, latest_rewrite); } LogItemContent::Kv(kv) => match kv.op_type { @@ -293,16 +304,17 @@ mod tests { cfg.dir = dir.path().to_str().unwrap().to_owned(); let engine = RaftLogEngine::new(cfg.clone()); - engine.purge_manager().remove_memtable(3, 10); - engine.purge_manager().remove_memtable(3, 9); - engine.purge_manager().remove_memtable(3, 11); - engine.purge_manager().remove_memtable(2, 9); - engine.purge_manager().remove_memtable(4, 4); - engine.purge_manager().remove_memtable(4, 3); + let tables = engine.purge_manager().get_removed_memtables(); + tables.remove_memtable(3, 10); + tables.remove_memtable(3, 9); + tables.remove_memtable(3, 11); + tables.remove_memtable(2, 9); + tables.remove_memtable(4, 4); + tables.remove_memtable(4, 3); - let mut tables = engine.purge_manager().removed_memtables.lock().unwrap(); + let mut guard = tables.0.lock().unwrap(); for (file_num, raft_id) in vec![(2, 9), (3, 11), (3, 10), (3, 9), (4, 4), (4, 3)] { - let item = tables.pop().unwrap(); + let item = guard.pop().unwrap(); assert_eq!((item.0).0, file_num); assert_eq!(item.1, raft_id); } diff --git a/src/util.rs b/src/util.rs index d3dd63b8..3c2d921d 100644 --- a/src/util.rs +++ b/src/util.rs @@ -321,3 +321,55 @@ impl Drop for Worker { self.stop(); } } + +#[derive(Clone, Debug, Copy, PartialEq, Default)] +pub struct Statistic { + pub avg_wal_cost: usize, + pub avg_sync_cost: usize, + pub avg_write_cost: usize, + pub avg_mem_cost: usize, + pub max_wal_cost: usize, + pub max_sync_cost: usize, + pub max_write_cost: usize, + pub max_mem_cost: usize, + pub freq: usize, +} + +fn max(left: usize, right: usize) -> usize { + if left > right { + left + } else { + right + } +} + +impl Statistic { + pub fn clear(&mut self) { + self.avg_wal_cost = 0; + self.avg_sync_cost = 0; + self.avg_write_cost = 0; + self.max_wal_cost = 0; + self.max_sync_cost = 0; + self.max_write_cost = 0; + self.freq = 0; + } + + #[inline] + pub fn add_wal(&mut self, wal: usize) { + self.avg_wal_cost += wal; + self.max_wal_cost = max(self.max_wal_cost, wal); + } + + #[inline] + pub fn add_sync(&mut self, sync: usize) { + self.avg_sync_cost += sync; + self.max_sync_cost = max(self.max_sync_cost, sync); + } + + pub fn divide(&mut self) { + if self.freq > 0 { + self.avg_wal_cost /= self.freq; + self.avg_sync_cost /= self.freq; + } + } +} diff --git a/src/wal.rs b/src/wal.rs new file mode 100644 index 00000000..eab62cf0 --- /dev/null +++ b/src/wal.rs @@ -0,0 +1,133 @@ +use futures::channel::oneshot::Sender; +use std::sync::mpsc::Receiver; + +use crate::cache_evict::{CacheSubmitor, CacheTracker}; +use crate::errors::Result; +use crate::pipe_log::{GenericPipeLog, LogQueue}; +use crate::util::Statistic; +use log::warn; +use std::time::Instant; + +pub struct WriteTask { + pub content: Vec, + pub entries_size: usize, + pub sync: bool, + pub sender: Sender<(u64, u64, Option)>, +} + +pub enum LogMsg { + Write(WriteTask), + Metric(Sender), + Stop, +} + +pub struct WalRunner { + cache_submitter: CacheSubmitor, + pipe_log: P, + receiver: Receiver, + statistic: Statistic, +} + +impl WalRunner

{ + pub fn new( + cache_submitter: CacheSubmitor, + pipe_log: P, + receiver: Receiver, + ) -> WalRunner

{ + WalRunner { + pipe_log, + cache_submitter, + receiver, + statistic: Statistic::default(), + } + } +} + +impl

WalRunner

+where + P: GenericPipeLog, +{ + pub fn run(&mut self) -> Result<()> { + let mut write_ret = vec![]; + const MAX_WRITE_BUFFER: usize = 1 * 1024 * 1024; // 2MB + let mut write_buffer = Vec::with_capacity(MAX_WRITE_BUFFER); + while let Ok(msg) = self.receiver.recv() { + let task = match msg { + LogMsg::Write(task) => task, + LogMsg::Stop => { + return Ok(()); + } + LogMsg::Metric(cb) => { + self.report(cb); + continue; + } + }; + let now = Instant::now(); + let mut sync = task.sync; + let mut entries_size = task.entries_size; + let (file_num, fd) = self.pipe_log.switch_log_file(LogQueue::Append).unwrap(); + write_ret.push((0, task.sender)); + + while let Ok(msg) = self.receiver.try_recv() { + if write_buffer.is_empty() { + write_buffer.extend_from_slice(&task.content); + } + let task = match msg { + LogMsg::Write(task) => task, + LogMsg::Stop => { + return Ok(()); + } + LogMsg::Metric(cb) => { + self.report(cb); + continue; + } + }; + if task.sync && !sync { + sync = true; + } + entries_size += task.entries_size; + write_ret.push((write_buffer.len() as u64, task.sender)); + write_buffer.extend_from_slice(&task.content); + if write_buffer.len() >= MAX_WRITE_BUFFER { + break; + } + } + let base_offset = if write_buffer.is_empty() { + self.pipe_log + .append(LogQueue::Append, &task.content) + .unwrap() + } else { + self.pipe_log + .append(LogQueue::Append, &write_buffer) + .unwrap() + }; + let before_sync_cost = now.elapsed().as_micros(); + if sync { + if let Err(e) = fd.sync() { + warn!("write wal failed because of: {} ", e); + write_ret.clear(); + } + } + let tracker = self + .cache_submitter + .get_cache_tracker(file_num, base_offset); + self.cache_submitter.fill_chunk(entries_size); + let wal_cost = now.elapsed().as_micros(); + self.statistic.add_wal(wal_cost as usize); + self.statistic + .add_sync((wal_cost - before_sync_cost) as usize); + self.statistic.freq += 1; + for (offset, sender) in write_ret.drain(..) { + let _ = sender.send((file_num, base_offset + offset, tracker.clone())); + } + write_buffer.clear(); + } + Ok(()) + } + + pub fn report(&mut self, cb: Sender) { + self.statistic.divide(); + let _ = cb.send(self.statistic.clone()); + self.statistic.clear(); + } +}