From 354dbf1840365e5d264b44d249b8631bd3ec0b96 Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 19 Oct 2020 17:33:42 +0800 Subject: [PATCH] introduce iovec Signed-off-by: qupeng --- Cargo.toml | 5 +- src/compression.rs | 176 ++++++++++++++++++++++++++++ src/engine.rs | 17 ++- src/iovec.rs | 282 +++++++++++++++++++++++++++++++++++++++++++-- src/lib.rs | 8 +- src/log_batch.rs | 205 ++++++++++++-------------------- src/pipe_log.rs | 137 ++++++++++++++++------ src/util.rs | 8 ++ 8 files changed, 653 insertions(+), 185 deletions(-) create mode 100644 src/compression.rs diff --git a/Cargo.toml b/Cargo.toml index f7f4b53e..b12378a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,11 +16,14 @@ fxhash = "0.2" nix = "0.18.0" crossbeam = "0.7" thiserror = "1.0" +libc = "0.2" [dev-dependencies] raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = ["protobuf-codec"] } tempfile = "3.1" toml = "0.5" +ctor = "0.1" +env_logger = "0.8" [patch.crates-io] -protobuf = { git = "https://github.com/pingcap/rust-protobuf", rev = "ec745253ab847481647887bc4c7cac3949449cfe" } +protobuf = { git = "https://github.com/pingcap/rust-protobuf", rev = "ac10abf324a6f2b3e19e10f82b568a293ca5bd3d" } diff --git a/src/compression.rs b/src/compression.rs new file mode 100644 index 00000000..9e3f341e --- /dev/null +++ b/src/compression.rs @@ -0,0 +1,176 @@ +use std::i32; +use std::ptr::{copy_nonoverlapping, read_unaligned}; + +use libc::c_int; +use lz4_sys::{ + LZ4StreamEncode, LZ4_compressBound, LZ4_compress_default, LZ4_createStreamDecode, + LZ4_decompress_safe, LZ4_decompress_safe_continue, LZ4_freeStreamDecode, +}; + +// Layout of single block compression: +// header + decoded_size + content + cap(tail). +pub fn encode_block(src: &[u8], head_reserve: usize, tail_alloc: usize) -> Vec { + unsafe { + let bound = LZ4_compressBound(src.len() as i32); + assert!(bound > 0 && src.len() <= i32::MAX as usize); + + let capacity = head_reserve + 4 + bound as usize + tail_alloc; + let mut output: Vec = Vec::with_capacity(capacity); + + let le_len = src.len().to_le_bytes(); + copy_nonoverlapping(le_len.as_ptr(), output.as_mut_ptr().add(head_reserve), 4); + + let size = LZ4_compress_default( + src.as_ptr() as _, + output.as_mut_ptr().add(head_reserve + 4) as _, + src.len() as i32, + bound, + ); + assert!(size > 0); + output.set_len(head_reserve + 4 + size as usize); + output + } +} + +pub fn decode_block(src: &[u8]) -> Vec { + assert!(src.len() > 4, "data is too short: {} <= 4", src.len()); + unsafe { + let len = u32::from_le(read_unaligned(src.as_ptr() as *const u32)); + let mut dst = Vec::with_capacity(len as usize); + let l = LZ4_decompress_safe( + src.as_ptr().add(4) as _, + dst.as_mut_ptr() as _, + src.len() as i32 - 4, + dst.capacity() as i32, + ); + assert_eq!(l, len as i32); + dst.set_len(l as usize); + dst + } +} + +// Layout of multi blocks compression: +// header + decoded_size + vec[encoded_len_and_content] + cap(tail). +pub fn encode_blocks<'a, F, I>(inputs: F, head_reserve: usize, tail_alloc: usize) -> Vec +where + F: Fn() -> I, + I: Iterator, +{ + let (mut encoded_len, mut decoded_len) = (0, 0u64); + for buffer in inputs() { + let len = buffer.len(); + decoded_len += len as u64; + let size = unsafe { lz4_sys::LZ4_compressBound(len as i32) }; + assert!(size > 0); + encoded_len += (4 + size) as usize; // Length and content. + } + + let capacity = head_reserve + 8 + encoded_len + tail_alloc; + let mut output: Vec = Vec::with_capacity(capacity); + unsafe { + copy_nonoverlapping( + decoded_len.to_le_bytes().as_ptr(), + output.as_mut_ptr().add(head_reserve), + 8, + ); + + let (stream, mut offset) = (lz4_sys::LZ4_createStream(), head_reserve + 8); + for buffer in inputs() { + let bytes = LZ4_compress_fast_continue( + stream, + buffer.as_ptr() as _, + output.as_mut_ptr().add(offset + 4), + buffer.len() as i32, + (capacity - offset) as i32, + 1, /* acceleration */ + ); + assert!(bytes > 0); + copy_nonoverlapping( + (bytes as u32).to_le_bytes().as_ptr(), + output.as_mut_ptr().add(offset), + 4, + ); + offset += (bytes + 4) as usize; + } + + lz4_sys::LZ4_freeStream(stream); + output.set_len(offset); + } + output +} + +pub fn decode_blocks(mut src: &[u8]) -> Vec { + assert!(src.len() > 8, "data is too short: {} <= 8", src.len()); + unsafe { + let decoded_len = u64::from_le(read_unaligned(src.as_ptr() as *const u64)); + let mut dst: Vec = Vec::with_capacity(decoded_len as usize); + src = &src[8..]; + + let (decoder, mut offset) = (LZ4_createStreamDecode(), 0); + while !src.is_empty() { + let len = u32::from_le(read_unaligned(src.as_ptr() as *const u32)); + let bytes = LZ4_decompress_safe_continue( + decoder, + src.as_ptr().add(4) as _, + dst.as_mut_ptr().add(offset) as _, + len as i32, + (dst.capacity() - offset) as i32, + ); + assert!(bytes >= 0); + offset += bytes as usize; + src = &src[(4 + len as usize)..]; + } + LZ4_freeStreamDecode(decoder); + assert_eq!(offset, decoded_len as usize); + dst.set_len(offset); + dst + } +} + +extern "C" { + // It's not in lz4_sys. + fn LZ4_compress_fast_continue( + LZ4_stream: *mut LZ4StreamEncode, + source: *const u8, + dest: *mut u8, + input_size: c_int, + dest_capacity: c_int, + acceleration: c_int, + ) -> c_int; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_basic() { + let data: Vec<&'static [u8]> = vec![b"", b"123", b"12345678910"]; + for d in data { + let compressed = encode_block(d, 0, 0); + assert!(compressed.len() > 4); + let res = decode_block(&compressed); + assert_eq!(res, d); + } + } + + #[test] + fn test_blocks() { + let raw_inputs = vec![ + b"".to_vec(), + b"123".to_vec(), + b"12345678910".to_vec(), + vec![b'x'; 99999], + vec![0; 33333], + ]; + + let mut input = Vec::with_capacity(raw_inputs.iter().map(|x| x.len()).sum()); + for x in &raw_inputs { + input.extend_from_slice(x); + } + + let encoded = encode_blocks(|| raw_inputs.iter().map(|x| x.as_slice()), 0, 0); + let decoded = decode_blocks(&encoded); + assert_eq!(input, decoded); + } +} diff --git a/src/engine.rs b/src/engine.rs index 338b81b1..08cb9686 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -9,6 +9,7 @@ use protobuf::Message; use crate::cache_evict::{ CacheSubmitor, CacheTask, Runner as CacheEvictRunner, DEFAULT_CACHE_CHUNK_SIZE, }; +use crate::compression::{decode_block, decode_blocks}; use crate::config::{Config, RecoveryMode}; use crate::log_batch::{ self, Command, CompressionType, EntryExt, LogBatch, LogItemContent, OpType, CHECKSUM_LEN, @@ -520,15 +521,19 @@ where let offset = base_offset + offset; pipe_log.fread(queue, file_num, offset, len)? } - CompressionType::Lz4 => { - let read_len = batch_len + HEADER_LEN as u64; + c_type @ CompressionType::Lz4 | c_type @ CompressionType::Lz4Blocks => { + let read_len = batch_len + HEADER_LEN as u64 + CHECKSUM_LEN as u64; let compressed = pipe_log.fread(queue, file_num, base_offset, read_len)?; - let mut reader = compressed.as_ref(); + log_batch::test_batch_checksum(compressed.as_slice())?; + + let mut reader = compressed.as_slice(); let header = codec::decode_u64(&mut reader)?; assert_eq!(header >> 8, batch_len); - - log_batch::test_batch_checksum(reader)?; - let buf = log_batch::decompress(&reader[..batch_len as usize - CHECKSUM_LEN]); + let buf = match c_type { + CompressionType::Lz4 => decode_block(&reader[..batch_len as usize]), + CompressionType::Lz4Blocks => decode_blocks(&reader[..batch_len as usize]), + _ => unreachable!(), + }; let start = offset as usize - HEADER_LEN; let end = (offset + len) as usize - HEADER_LEN; buf[start..end].to_vec() diff --git a/src/iovec.rs b/src/iovec.rs index f8965413..6d21bbd2 100644 --- a/src/iovec.rs +++ b/src/iovec.rs @@ -1,24 +1,79 @@ -use std::io::{Result, Write}; +use std::cmp::Ordering; +use std::io::{Cursor, Error, ErrorKind, Result, Seek, SeekFrom, Write}; -pub struct LengthFixedIoVec { +use crc32fast::Hasher; +use nix::sys::uio::IoVec as NixIoVec; + +use crate::compression::{encode_block, encode_blocks}; +use crate::log_batch::CompressionType; +use crate::util::crc32; + +pub trait IoVecs: Write + Seek + Default { + fn content_len(&self) -> usize; + fn crc32(&self) -> u32; + fn compress( + &mut self, + skip_header: usize, + head_reserve: usize, + tail_alloc: usize, + ) -> CompressionType; +} + +#[derive(Default)] +pub struct LengthFixedIoVecs { row_capacity: usize, buffers: Vec>, + position: Option<(usize, usize)>, } -impl LengthFixedIoVec { - pub fn new(row_capacity: usize) -> LengthFixedIoVec { - LengthFixedIoVec { +impl LengthFixedIoVecs { + pub fn new(row_capacity: usize) -> LengthFixedIoVecs { + LengthFixedIoVecs { row_capacity, buffers: vec![], + position: None, } } -} -impl Write for LengthFixedIoVec { - fn write(&mut self, buf: &[u8]) -> Result { - if self.buffers.is_empty() { - self.buffers.push(Vec::with_capacity(self.row_capacity)); + /// If it contains only one single iovec, call `into_vec` instead of `to_nix_iovecs`. + pub fn has_single_iovec(&self) -> bool { + self.buffers.len() == 1 + } + + pub fn to_nix_iovecs(&self) -> Vec> { + let mut vecs = Vec::with_capacity(self.buffers.len()); + for buffer in &self.buffers { + let vec = NixIoVec::from_slice(buffer.as_slice()); + vecs.push(vec); + } + vecs + } + + pub fn into_vec(mut self) -> Vec { + self.take_vec() + } + + fn take_vec(&mut self) -> Vec { + debug_assert!(self.has_single_iovec()); + let buffers = std::mem::take(&mut self.buffers); + buffers.into_iter().next().unwrap() + } + + fn reset_vec(&mut self, vec: Vec) { + self.row_capacity = vec.capacity(); + self.buffers = vec![vec]; + self.position = None; + } + + fn update_position(&mut self, row: usize, offset: usize) { + if self.row_capacity * row + offset == self.content_len() { + self.position = None; + return; } + self.position = Some((row, offset)); + } + + fn append(&mut self, buf: &[u8]) -> Result { let buffer = self.buffers.last_mut().unwrap(); if buffer.len() + buf.len() <= self.row_capacity { buffer.extend_from_slice(buf); @@ -30,7 +85,214 @@ impl Write for LengthFixedIoVec { Ok(size) } + fn pwrite(&mut self, mut buf: &[u8]) -> Result { + let return_value = buf.len(); + + let (mut row, mut offset) = self.position.unwrap(); + debug_assert!(self.row_capacity * row + offset < self.content_len()); + loop { + let src_len = buf.len(); + let dst_len = self.buffers[row].len() - offset; + if dst_len >= src_len { + copy_slice(buf, &mut self.buffers[row][offset..], src_len); + offset += src_len; + break; + } + copy_slice(buf, &mut self.buffers[row][offset..], dst_len); + row += 1; + offset = 0; + buf = &buf[dst_len..]; + } + + self.update_position(row, offset); + Ok(return_value) + } +} + +impl Write for LengthFixedIoVecs { + fn write(&mut self, buf: &[u8]) -> Result { + if self.buffers.is_empty() { + debug_assert_eq!(self.position.unwrap_or_default(), (0, 0)); + self.buffers.push(Vec::with_capacity(self.row_capacity)); + self.position = None; + } + if let Some((row, offset)) = self.position { + let offset = self.row_capacity * row + offset; + if offset + buf.len() <= self.content_len() { + return self.pwrite(buf); + } + let len = self.content_len() - offset; + self.pwrite(&buf[..len])?; + return Ok(len + self.append(&buf[len..])?); + } + self.append(buf) + } + fn flush(&mut self) -> Result<()> { Ok(()) } } + +impl Seek for LengthFixedIoVecs { + fn seek(&mut self, pos: SeekFrom) -> Result { + #[inline] + fn seek_from_start(vec: &mut LengthFixedIoVecs, offset: u64) -> Result { + match (offset as usize).cmp(&vec.content_len()) { + Ordering::Less => { + let full = offset as usize / vec.row_capacity; + let partial = offset as usize % vec.row_capacity; + vec.position = Some((full, partial)); + Ok(offset) + } + Ordering::Equal => { + vec.position = None; + Ok(offset) + } + Ordering::Greater => Err(Error::from(ErrorKind::InvalidInput)), + } + } + + let offset = match pos { + SeekFrom::Start(offset) => return seek_from_start(self, offset), + SeekFrom::End(offset) => self.content_len() as i64 + offset, + SeekFrom::Current(offset) => match self.position { + Some((r, off)) => (self.row_capacity * r + off) as i64 + offset, + None => self.content_len() as i64 + offset, + }, + }; + if offset < 0 { + return Err(Error::from(ErrorKind::InvalidInput)); + } + seek_from_start(self, offset as u64) + } +} + +fn copy_slice(src: &[u8], dst: &mut [u8], len: usize) { + debug_assert!(src.len() >= len); + debug_assert!(dst.len() >= len); + unsafe { std::ptr::copy(src.as_ptr(), dst.as_mut_ptr(), len) } +} + +impl IoVecs for LengthFixedIoVecs { + fn content_len(&self) -> usize { + if let Some(last) = self.buffers.last() { + return (self.buffers.len() - 1) * self.row_capacity + last.len(); + } + 0 + } + + fn crc32(&self) -> u32 { + let mut hasher = Hasher::new(); + for slice in &self.buffers { + hasher.update(slice.as_slice()); + } + hasher.finalize() + } + + fn compress( + &mut self, + mut skip_header: usize, + head_reserve: usize, + tail_alloc: usize, + ) -> CompressionType { + debug_assert!(self.content_len() > 0); + if self.has_single_iovec() { + let mut cursor = Cursor::new(self.take_vec()); + let compress_type = cursor.compress(skip_header, head_reserve, tail_alloc); + self.reset_vec(cursor.into_inner()); + return compress_type; + } + + let (mut start_row, mut start_offset) = (0, 0); + for buffer in &self.buffers { + if buffer.len() > skip_header { + start_offset = skip_header; + break; + } + start_row += 1; + skip_header -= buffer.len(); + } + + let iter_on_slices = || { + let buffers = self.buffers[start_row..].iter(); + buffers.enumerate().map(|(i, buffer)| { + let begin = if i == 0 { start_offset } else { 0 }; + &buffer.as_slice()[begin..buffer.len()] + }) + }; + let output = encode_blocks(iter_on_slices, head_reserve, tail_alloc); + self.reset_vec(output); + CompressionType::Lz4Blocks + } +} + +impl IoVecs for Cursor> { + fn content_len(&self) -> usize { + self.get_ref().len() + } + + fn crc32(&self) -> u32 { + crc32(self.get_ref().as_slice()) + } + + fn compress( + &mut self, + skip_header: usize, + head_reserve: usize, + tail_alloc: usize, + ) -> CompressionType { + debug_assert!(self.content_len() > 0); + let buf = self.get_ref().as_slice(); + let vec = encode_block(&buf[skip_header..], head_reserve, tail_alloc); + *self = Cursor::new(vec); + CompressionType::Lz4 + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::compression::{decode_block, decode_blocks}; + + #[test] + fn test_length_fixed_io_vecs() { + let mut iovecs = LengthFixedIoVecs::new(1024); + assert_eq!(iovecs.write(vec![b'x'; 128].as_slice()).unwrap(), 128); + assert_eq!(iovecs.write(vec![b'y'; 1024].as_slice()).unwrap(), 896); + iovecs.write_all(vec![b'z'; 2048 + 128].as_slice()).unwrap(); + assert_eq!(iovecs.buffers.len(), 4); + + // Test seek + full pwrite. + iovecs.seek(SeekFrom::Start(1023)).unwrap(); + assert_eq!(iovecs.write(&[b'a', b'a']).unwrap(), 2); + assert_eq!(iovecs.buffers[0][1023], b'a'); + assert_eq!(iovecs.buffers[1][0], b'a'); + assert_eq!(iovecs.position.unwrap(), (1, 1)); + + // Test seek + partial pwrite + append. + iovecs.seek(SeekFrom::End(-1)).unwrap(); + assert_eq!(iovecs.write(&[b'a', b'a']).unwrap(), 2); + assert_eq!(iovecs.buffers[3][127], b'a'); + assert_eq!(iovecs.buffers[3][128], b'a'); + assert!(iovecs.position.is_none()); + + // Test seek + partial pwrite. + assert_eq!(iovecs.write(vec![b'z'; 895].as_slice()).unwrap(), 895); + iovecs.seek(SeekFrom::End(-1)).unwrap(); + assert_eq!(iovecs.write(&[b'a', b'a']).unwrap(), 1); + assert_eq!(iovecs.buffers[3][1023], b'a'); + assert!(iovecs.position.is_none()); + + // Test compression. + assert_eq!(iovecs.compress(0, 0, 0), CompressionType::Lz4Blocks); + assert!(iovecs.has_single_iovec()); + let decoded_1 = decode_blocks(&iovecs.into_vec()); + + let mut iovecs1 = LengthFixedIoVecs::default(); + iovecs1.reset_vec(decoded_1.clone()); + assert!(iovecs1.has_single_iovec()); + assert_eq!(iovecs1.compress(0, 0, 0), CompressionType::Lz4); + let decoded_2 = decode_block(&iovecs1.into_vec()); + assert_eq!(decoded_1, decoded_2); + } +} diff --git a/src/lib.rs b/src/lib.rs index 5f661377..686de897 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,7 @@ macro_rules! box_err { pub mod codec; mod cache_evict; +mod compression; mod config; mod engine; mod errors; @@ -27,7 +28,7 @@ mod pipe_log; mod purge; mod util; -use crate::iovec::LengthFixedIoVec; +use crate::iovec::{IoVecs, LengthFixedIoVecs}; use crate::pipe_log::PipeLog; pub use self::config::{Config, RecoveryMode}; @@ -113,6 +114,11 @@ mod tests { use crate::log_batch::EntryExt; use raft::eraftpb::Entry; + #[ctor::ctor] + fn init() { + env_logger::init(); + } + impl EntryExt for Entry { fn index(e: &Entry) -> u64 { e.get_index() diff --git a/src/log_batch.rs b/src/log_batch.rs index 2bf3391f..2cc9b763 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -1,18 +1,19 @@ use std::borrow::{Borrow, Cow}; use std::cell::RefCell; -use std::io::BufRead; +use std::io::{BufRead, SeekFrom}; use std::marker::PhantomData; use std::{mem, u64}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; -use crc32fast::Hasher; use protobuf::Message; use crate::cache_evict::CacheTracker; use crate::codec::{self, Error as CodecError, NumberEncoder}; +use crate::compression::{decode_block, decode_blocks}; use crate::memtable::EntryIndex; use crate::pipe_log::LogQueue; -use crate::{Error, LengthFixedIoVec, Result}; +use crate::util::crc32; +use crate::{Error, IoVecs, Result}; pub const HEADER_LEN: usize = 8; pub const CHECKSUM_LEN: usize = 4; @@ -26,89 +27,18 @@ const CMD_COMPACT: u8 = 0x02; const BATCH_MIN_SIZE: usize = HEADER_LEN + CHECKSUM_LEN; const COMPRESSION_THRESHOLD: usize = 4096; -const COMPRESSION_BLOCK: usize = 32 * 1024; - -#[inline] -fn crc32(data: &[u8]) -> u32 { - let mut hasher = Hasher::new(); - hasher.update(data); - hasher.finalize() -} - -mod lz4 { - use std::{i32, ptr}; - - pub fn encode_block(src: &[u8], head_reserve: usize, tail_alloc: usize) -> Vec { - unsafe { - let bound = lz4_sys::LZ4_compressBound(src.len() as i32); - assert!(bound > 0 && src.len() <= i32::MAX as usize); - - // Layout: { header | decoded_len | content | checksum }. - let capacity = head_reserve + 4 + bound as usize + tail_alloc; - let mut output: Vec = Vec::with_capacity(capacity); - - let le_len = src.len().to_le_bytes(); - ptr::copy_nonoverlapping(le_len.as_ptr(), output.as_mut_ptr().add(head_reserve), 4); - - let size = lz4_sys::LZ4_compress_default( - src.as_ptr() as _, - output.as_mut_ptr().add(head_reserve + 4) as _, - src.len() as i32, - bound, - ); - assert!(size > 0); - output.set_len(head_reserve + 4 + size as usize); - output - } - } - - pub fn decode_block(src: &[u8]) -> Vec { - assert!(src.len() > 4, "data is too short: {} <= 4", src.len()); - unsafe { - let len = u32::from_le(ptr::read_unaligned(src.as_ptr() as *const u32)); - let mut dst = Vec::with_capacity(len as usize); - let l = lz4_sys::LZ4_decompress_safe( - src.as_ptr().add(4) as _, - dst.as_mut_ptr() as _, - src.len() as i32 - 4, - dst.capacity() as i32, - ); - if l == len as i32 { - dst.set_len(l as usize); - return dst; - } - if l < 0 { - panic!("decompress failed: {}", l); - } else { - panic!("length of decompress result not match {} != {}", len, l); - } - } - } - - #[cfg(test)] - mod tests { - #[test] - fn test_basic() { - let data: Vec<&'static [u8]> = vec![b"", b"123", b"12345678910"]; - for d in data { - let compressed = super::encode_block(d, 0, 0); - assert!(compressed.len() > 4); - let res = super::decode_block(&compressed); - assert_eq!(res, d); - } - } - } -} #[repr(u8)] #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum CompressionType { None = 0, Lz4 = 1, + Lz4Blocks = 2, } impl CompressionType { pub fn from_byte(t: u8) -> CompressionType { + debug_assert!(t <= 2); unsafe { mem::transmute(t) } } @@ -162,7 +92,7 @@ impl Entries { let mut entries = Vec::with_capacity(count); let mut entries_index = Vec::with_capacity(count); while count > 0 { - let len = codec::decode_var_u64(buf)? as usize; + let len = codec::decode_u32(buf)? as usize; let mut e = E::new(); e.merge_from_bytes(&buf[..len])?; @@ -183,9 +113,10 @@ impl Entries { Ok(Entries::new(entries, Some(entries_index))) } - pub fn encode_to(&self, vec: &mut Vec, entries_size: &mut usize) -> Result<()> + pub fn encode_to(&self, vec: &mut T, entries_size: &mut usize) -> Result<()> where W: EntryExt, + T: IoVecs, { if self.entries.is_empty() { return Ok(()); @@ -193,23 +124,26 @@ impl Entries { // layout = { entries count | multiple entries } // entries layout = { entry layout | ... | entry layout } - // entry layout = { len | entry content } + // entry layout = { len(u32) | entry content } vec.encode_var_u64(self.entries.len() as u64)?; for (i, e) in self.entries.iter().enumerate() { - let content = e.write_to_bytes()?; - vec.encode_var_u64(content.len() as u64)?; + let prefixed_len_offset = vec.content_len(); + vec.encode_u32(0)?; + e.write_to_writer_without_buffer(vec)?; + let content_len = vec.content_len() - prefixed_len_offset - 4; + vec.seek(SeekFrom::Start(prefixed_len_offset as u64))?; + vec.encode_u32(content_len as u32)?; + vec.seek(SeekFrom::End(0))?; // file_num = 0 means entry index is not initialized. let mut entries_index = self.entries_index.borrow_mut(); if entries_index[i].file_num == 0 { entries_index[i].index = W::index(&e); // This offset doesn't count the header. - entries_index[i].offset = vec.len() as u64; - entries_index[i].len = content.len() as u64; + entries_index[i].offset = prefixed_len_offset as u64 + 4; + entries_index[i].len = content_len as u64; *entries_size += entries_index[i].len as usize; } - - vec.extend_from_slice(&content); } Ok(()) } @@ -260,13 +194,13 @@ pub enum Command { } impl Command { - pub fn encode_to(&self, vec: &mut Vec) { + pub fn encode_to(&self, vec: &mut T) { match *self { Command::Clean => { - vec.push(CMD_CLEAN); + vec.write_u8(CMD_CLEAN).unwrap(); } Command::Compact { index } => { - vec.push(CMD_COMPACT); + vec.write_u8(CMD_COMPACT).unwrap(); vec.encode_var_u64(index).unwrap(); } } @@ -292,8 +226,8 @@ pub enum OpType { } impl OpType { - pub fn encode_to(self, vec: &mut Vec) { - vec.push(self as u8); + pub fn encode_to(self, vec: &mut T) { + vec.write_u8(self as u8).unwrap(); } pub fn from_bytes(buf: &mut SliceReader<'_>) -> Result { @@ -338,15 +272,15 @@ impl KeyValue { } } - pub fn encode_to(&self, vec: &mut Vec) -> Result<()> { + pub fn encode_to(&self, vec: &mut T) -> Result<()> { // layout = { op_type | k_len | key | v_len | value } self.op_type.encode_to(vec); vec.encode_var_u64(self.key.len() as u64)?; - vec.extend_from_slice(self.key.as_slice()); + vec.write_all(self.key.as_slice())?; match self.op_type { OpType::Put => { vec.encode_var_u64(self.value.as_ref().unwrap().len() as u64)?; - vec.extend_from_slice(self.value.as_ref().unwrap().as_slice()); + vec.write_all(self.value.as_ref().unwrap().as_slice())?; } OpType::Del => {} } @@ -400,23 +334,24 @@ impl LogItem { } } - pub fn encode_to(&self, vec: &mut Vec, entries_size: &mut usize) -> Result<()> + pub fn encode_to(&self, vec: &mut T, entries_size: &mut usize) -> Result<()> where W: EntryExt, + T: IoVecs, { // layout = { 8 byte id | 1 byte type | item layout } vec.encode_var_u64(self.raft_group_id)?; match self.content { LogItemContent::Entries(ref entries) => { - vec.push(TYPE_ENTRIES); - entries.encode_to::(vec, entries_size)?; + vec.write_u8(TYPE_ENTRIES)?; + entries.encode_to::(vec, entries_size)?; } LogItemContent::Command(ref command) => { - vec.push(TYPE_COMMAND); + vec.write_u8(TYPE_COMMAND)?; command.encode_to(vec); } LogItemContent::Kv(ref kv) => { - vec.push(TYPE_KV); + vec.write_u8(TYPE_KV)?; kv.encode_to(vec)?; } } @@ -557,14 +492,17 @@ where return Err(Error::TooShort); } - let header = codec::decode_u64(buf)? as usize; + let mut header_buf = *buf; + let header = codec::decode_u64(&mut header_buf)? as usize; let batch_len = header >> 8; let batch_type = CompressionType::from_byte(header as u8); - test_batch_checksum(&buf[..batch_len])?; + test_batch_checksum(&buf[..batch_len + HEADER_LEN + CHECKSUM_LEN])?; + *buf = &buf[HEADER_LEN..]; let decompressed = match batch_type { - CompressionType::None => Cow::Borrowed(&buf[..(batch_len - CHECKSUM_LEN)]), - CompressionType::Lz4 => Cow::Owned(decompress(&buf[..batch_len - CHECKSUM_LEN])), + CompressionType::None => Cow::Borrowed(&buf[..batch_len]), + CompressionType::Lz4 => Cow::Owned(decode_block(&buf[..batch_len])), + CompressionType::Lz4Blocks => Cow::Owned(decode_blocks(&buf[..batch_len])), }; let mut reader: SliceReader = decompressed.borrow(); @@ -586,7 +524,7 @@ where items_count -= 1; } assert!(reader.is_empty()); - buf.consume(batch_len); + buf.consume(batch_len + CHECKSUM_LEN); for item in &log_batch.items { if let LogItemContent::Entries(ref entries) = item.content { @@ -598,42 +536,44 @@ where } // TODO: avoid to write a large batch into one compressed chunk. - pub fn encode_to_bytes(&self) -> Option> { + pub fn encode_to_bytes(&self, vec: &mut T) -> bool { + assert_eq!(vec.content_len(), 0); if self.items.is_empty() { - return None; + return false; } // layout = { 8 bytes len | item count | multiple items | 4 bytes checksum } - let mut vec = LengthFixedIoVec::new(COMPRESSION_BLOCK); vec.encode_u64(0).unwrap(); vec.encode_var_u64(self.items.len() as u64).unwrap(); for item in &self.items { - item.encode_to::(&mut vec, &mut *self.entries_size.borrow_mut()) + item.encode_to::(vec, &mut *self.entries_size.borrow_mut()) .unwrap(); } - let compression_type = if vec.len() > COMPRESSION_THRESHOLD { - vec = lz4::encode_block(&vec[HEADER_LEN..], HEADER_LEN, 4); - CompressionType::Lz4 + let compression_type = if vec.content_len() > COMPRESSION_THRESHOLD { + vec.compress(HEADER_LEN, HEADER_LEN, CHECKSUM_LEN) } else { CompressionType::None }; - let checksum = crc32(&vec[8..]); - vec.encode_u32_le(checksum).unwrap(); - let len = vec.len() as u64 - 8; + let len = (vec.content_len() - HEADER_LEN) as u64; let mut header = len << 8; header |= u64::from(compression_type.to_byte()); - vec.as_mut_slice().write_u64::(header).unwrap(); + vec.seek(SeekFrom::Start(0)).unwrap(); + vec.write_u64::(header).unwrap(); + vec.seek(SeekFrom::End(0)).unwrap(); - let batch_len = (vec.len() - 8) as u64; + let checksum = vec.crc32(); + vec.encode_u32_le(checksum).unwrap(); + + let batch_len = (vec.content_len() - HEADER_LEN - CHECKSUM_LEN) as u64; for item in &self.items { if let LogItemContent::Entries(ref entries) = item.content { entries.update_compression_type(compression_type, batch_len as u64); } } - Some(vec) + true } pub fn entries_size(&self) -> usize { @@ -641,6 +581,7 @@ where } } +/// The passed in buf's layout: { header | content | checksum }. pub fn test_batch_checksum(buf: &[u8]) -> Result<()> { if buf.len() <= CHECKSUM_LEN { return Err(Error::TooShort); @@ -656,16 +597,11 @@ pub fn test_batch_checksum(buf: &[u8]) -> Result<()> { Ok(()) } -// NOTE: lz4::decode_block will truncate the output buffer first. -pub fn decompress(buf: &[u8]) -> Vec { - self::lz4::decode_block(buf) -} - #[cfg(test)] mod tests { use super::*; - use raft::eraftpb::Entry; + use std::io::Cursor; #[test] fn test_entries_enc_dec() { @@ -673,14 +609,14 @@ mod tests { let file_num = 1; let entries = Entries::new(pb_entries, None); - let (mut encoded, mut entries_size1) = (vec![], 0); + let (mut encoded, mut entries_size1) = (Cursor::new(vec![]), 0); entries - .encode_to::(&mut encoded, &mut entries_size1) + .encode_to::>>(&mut encoded, &mut entries_size1) .unwrap(); for idx in entries.entries_index.borrow_mut().iter_mut() { idx.file_num = file_num; } - let (mut s, mut entries_size2) = (encoded.as_slice(), 0); + let (mut s, mut entries_size2) = (encoded.get_ref().as_slice(), 0); let decode_entries = Entries::from_bytes::(&mut s, file_num, 0, 0, &mut entries_size2).unwrap(); assert_eq!(s.len(), 0); @@ -691,9 +627,9 @@ mod tests { #[test] fn test_command_enc_dec() { let cmd = Command::Clean; - let mut encoded = vec![]; + let mut encoded = Cursor::new(vec![]); cmd.encode_to(&mut encoded); - let mut bytes_slice = encoded.as_slice(); + let mut bytes_slice = encoded.get_ref().as_slice(); let decoded_cmd = Command::from_bytes(&mut bytes_slice).unwrap(); assert_eq!(bytes_slice.len(), 0); assert_eq!(cmd, decoded_cmd); @@ -702,9 +638,9 @@ mod tests { #[test] fn test_kv_enc_dec() { let kv = KeyValue::new(OpType::Put, b"key".to_vec(), Some(b"value".to_vec())); - let mut encoded = vec![]; + let mut encoded = Cursor::new(vec![]); kv.encode_to(&mut encoded).unwrap(); - let mut bytes_slice = encoded.as_slice(); + let mut bytes_slice = encoded.get_ref().as_slice(); let decoded_kv = KeyValue::from_bytes(&mut bytes_slice).unwrap(); assert_eq!(bytes_slice.len(), 0); assert_eq!(kv, decoded_kv); @@ -725,10 +661,10 @@ mod tests { ]; for item in items { - let (mut encoded, mut entries_size1) = (vec![], 0); - item.encode_to::(&mut encoded, &mut entries_size1) + let (mut encoded, mut entries_size1) = (Cursor::new(vec![]), 0); + item.encode_to::>>(&mut encoded, &mut entries_size1) .unwrap(); - let (mut s, mut entries_size2) = (encoded.as_slice(), 0); + let (mut s, mut entries_size2) = (encoded.get_ref().as_slice(), 0); let decoded_item = LogItem::from_bytes::(&mut s, 0, 0, 0, &mut entries_size2).unwrap(); assert_eq!(s.len(), 0); @@ -747,8 +683,9 @@ mod tests { batch.put(region_id, b"key".to_vec(), b"value".to_vec()); batch.delete(region_id, b"key2".to_vec()); - let encoded = batch.encode_to_bytes().unwrap(); - let mut s = encoded.as_slice(); + let mut encoded = Cursor::new(vec![]); + batch.encode_to_bytes(&mut encoded); + let mut s = encoded.get_ref().as_slice(); let decoded_batch = LogBatch::from_bytes(&mut s, 0, 0).unwrap().unwrap(); assert_eq!(s.len(), 0); assert_eq!(batch, decoded_batch); diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 13b7d5b6..beb7810c 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -10,7 +10,7 @@ use log::{info, warn}; use nix::errno::Errno; use nix::fcntl::{self, OFlag}; use nix::sys::stat::Mode; -use nix::sys::uio::{pread, pwrite}; +use nix::sys::uio::{pread, pwrite, pwritev, IoVec as NixIoVec}; use nix::unistd::{close, fsync, ftruncate, lseek, Whence}; use nix::NixPath; use protobuf::Message; @@ -19,7 +19,7 @@ use crate::cache_evict::CacheSubmitor; use crate::config::Config; use crate::log_batch::{EntryExt, LogBatch, LogItemContent}; use crate::util::HandyRwLock; -use crate::{Error, Result}; +use crate::{Error, IoVecs, LengthFixedIoVecs, Result}; const LOG_SUFFIX: &str = ".raftlog"; const LOG_SUFFIX_LEN: usize = 8; @@ -39,6 +39,8 @@ const DEFAULT_FILES_COUNT: usize = 32; #[cfg(target_os = "linux")] const FILE_ALLOCATE_SIZE: usize = 2 * 1024 * 1024; +const IO_VEC_SIZE: usize = 32 * 1024; + pub trait GenericPipeLog: Sized + Clone + Send { /// Read some bytes from the given position. fn fread(&self, queue: LogQueue, file_num: u64, offset: u64, len: u64) -> Result>; @@ -51,6 +53,13 @@ pub trait GenericPipeLog: Sized + Clone + Send { sync: &mut bool, ) -> Result<(u64, u64, Arc)>; + fn appendv( + &self, + queue: LogQueue, + content: LengthFixedIoVecs, + sync: &mut bool, + ) -> Result<(u64, u64, Arc)>; + /// Close the pipe log. fn close(&self) -> Result<()>; @@ -431,11 +440,28 @@ impl GenericPipeLog for PipeLog { content: &[u8], sync: &mut bool, ) -> Result<(u64, u64, Arc)> { - let (file_num, offset, fd) = self.mut_queue(queue).on_append(content.len(), sync)?; + let len = content.len(); + let (file_num, offset, fd) = self.mut_queue(queue).on_append(len, sync)?; pwrite_exact(fd.0, offset, content)?; Ok((file_num, offset, fd)) } + fn appendv( + &self, + queue: LogQueue, + content: LengthFixedIoVecs, + sync: &mut bool, + ) -> Result<(u64, u64, Arc)> { + if content.has_single_iovec() { + let content = content.into_vec(); + return self.append(queue, &content, sync); + } + let len = content.content_len(); + let (file_num, offset, fd) = self.mut_queue(queue).on_append(len, sync)?; + pwritev_exact(fd.0, offset, &mut content.to_nix_iovecs())?; + Ok((file_num, offset, fd)) + } + fn close(&self) -> Result<()> { let _write_lock = self.cache_submitor.lock().unwrap(); self.appender.wl().truncate_active_log(None)?; @@ -449,27 +475,30 @@ impl GenericPipeLog for PipeLog { mut sync: bool, file_num: &mut u64, ) -> Result { - if let Some(content) = batch.encode_to_bytes() { - // TODO: `pwrite` is performed in the mutex. Is it possible for concurrence? - let mut cache_submitor = self.cache_submitor.lock().unwrap(); - let (cur_file_num, offset, fd) = self.append(LogQueue::Append, &content, &mut sync)?; - let tracker = - cache_submitor.get_cache_tracker(cur_file_num, offset, batch.entries_size()); - drop(cache_submitor); - if sync { - fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; - } + let mut content = LengthFixedIoVecs::new(IO_VEC_SIZE); + if !batch.encode_to_bytes(&mut content) { + return Ok(0); + } + let entries_size = batch.entries_size(); + let content_len = content.content_len(); + + // TODO: `pwrite` is performed in the mutex. Is it possible for concurrence? + let mut cache_submitor = self.cache_submitor.lock().unwrap(); + let (cur_file_num, offset, fd) = self.appendv(LogQueue::Append, content, &mut sync)?; + let tracker = cache_submitor.get_cache_tracker(cur_file_num, offset, entries_size); + drop(cache_submitor); + if sync { + fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; + } - for item in &batch.items { - if let LogItemContent::Entries(ref entries) = item.content { - entries.update_position(LogQueue::Append, cur_file_num, offset, &tracker); - } + for item in &batch.items { + if let LogItemContent::Entries(ref entries) = item.content { + entries.update_position(LogQueue::Append, cur_file_num, offset, &tracker); } - - *file_num = cur_file_num; - return Ok(content.len()); } - Ok(0) + + *file_num = cur_file_num; + Ok(content_len) } fn rewrite>( @@ -478,20 +507,23 @@ impl GenericPipeLog for PipeLog { mut sync: bool, file_num: &mut u64, ) -> Result { - if let Some(content) = batch.encode_to_bytes() { - let (cur_file_num, offset, fd) = self.append(LogQueue::Rewrite, &content, &mut sync)?; - if sync { - fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; - } - for item in &batch.items { - if let LogItemContent::Entries(ref entries) = item.content { - entries.update_position(LogQueue::Rewrite, cur_file_num, offset, &None); - } + let mut content = LengthFixedIoVecs::new(IO_VEC_SIZE); + if !batch.encode_to_bytes(&mut content) { + return Ok(0); + } + let content_len = content.content_len(); + + let (cur_file_num, offset, fd) = self.appendv(LogQueue::Rewrite, content, &mut sync)?; + if sync { + fsync(fd.0).map_err(|e| parse_nix_error(e, "fsync"))?; + } + for item in &batch.items { + if let LogItemContent::Entries(ref entries) = item.content { + entries.update_position(LogQueue::Rewrite, cur_file_num, offset, &None); } - *file_num = cur_file_num; - return Ok(content.len()); } - Ok(0) + *file_num = cur_file_num; + Ok(content_len) } fn truncate_active_log(&self, queue: LogQueue, offset: Option) -> Result<()> { @@ -659,6 +691,45 @@ fn pwrite_exact(fd: RawFd, mut offset: u64, content: &[u8]) -> Result<()> { Ok(()) } +fn pwritev_exact<'a>( + fd: RawFd, + mut offset: u64, + mut vecs: &'a mut [NixIoVec<&'a [u8]>], +) -> Result<()> { + fn consume<'a>( + mut vecs: &'a mut [NixIoVec<&'a [u8]>], + mut bytes: usize, + ) -> &'a mut [NixIoVec<&'a [u8]>] { + debug_assert!(!vecs.is_empty()); + loop { + if bytes == 0 { + return vecs; + } + let buf_len = vecs[0].as_slice().len(); + if buf_len > bytes { + break; + } + bytes -= buf_len; + vecs = &mut vecs[1..]; + } + let buf: NixIoVec<&'a [u8]> = std::mem::replace(&mut vecs[0], NixIoVec::from_slice(&[])); + let buf: &'a [u8] = unsafe { std::mem::transmute(buf.as_slice()) }; + vecs[0] = NixIoVec::from_slice(&buf[bytes..]); + vecs + } + + while !vecs.is_empty() { + let bytes = match pwritev(fd, vecs, offset as _) { + Ok(bytes) => bytes, + Err(e) if e.as_errno() == Some(Errno::EAGAIN) => continue, + Err(e) => return Err(parse_nix_error(e, "pwrite")), + }; + offset += bytes as u64; + vecs = consume(vecs, bytes); + } + Ok(()) +} + fn write_file_header(fd: RawFd) -> Result { let len = FILE_MAGIC_HEADER.len() + VERSION.len(); let mut header = Vec::with_capacity(len); diff --git a/src/util.rs b/src/util.rs index d3dd63b8..98bcdaf4 100644 --- a/src/util.rs +++ b/src/util.rs @@ -11,6 +11,7 @@ use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::thread::{Builder as ThreadBuilder, JoinHandle}; use std::time::Duration; +use crc32fast::Hasher; use crossbeam::channel::{bounded, unbounded, Receiver, RecvTimeoutError, Sender}; use log::warn; use serde::de::{self, Unexpected, Visitor}; @@ -321,3 +322,10 @@ impl Drop for Worker { self.stop(); } } + +#[inline] +pub fn crc32(data: &[u8]) -> u32 { + let mut hasher = Hasher::new(); + hasher.update(data); + hasher.finalize() +}