From 30347158dcb24960f19d271ec4de6e9398c3898f Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan Date: Fri, 21 Jul 2023 12:57:39 +0200 Subject: [PATCH] Use mmap IO for value tables (#214) * Experimental mmap IO for tables * Eliminate extra copy on read * Reclaim overlay mem * Tweak reclaim params * Bump version * Style * Style * Loom workaround * Loom workaround * fmt * Destroy old map first * Really destroy old map first * Reserve address space for the file mapping * aarch64 CI test * aarch64 CI test * Align file size * aarch64 CI test fix --- Cargo.toml | 2 +- admin/src/lib.rs | 4 +- src/column.rs | 2 +- src/file.rs | 198 +++++++++++++++++++++-------------------------- src/index.rs | 36 ++++----- src/log.rs | 183 +++++++++++++++++++++++++++++++++++++------ src/table.rs | 153 ++++++++++++++++++++---------------- 7 files changed, 356 insertions(+), 222 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2a07b5e8..c88cc96b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "parity-db" -version = "0.4.9" +version = "0.4.10" authors = ["Parity Technologies "] edition = "2021" license = "MIT OR Apache-2.0" diff --git a/admin/src/lib.rs b/admin/src/lib.rs index 6236bd38..c15f0160 100644 --- a/admin/src/lib.rs +++ b/admin/src/lib.rs @@ -114,12 +114,12 @@ pub fn run() -> Result<(), String> { let mut db_options = options.clone(); if args.compress { - for mut c in &mut db_options.columns { + for c in &mut db_options.columns { c.compression = parity_db::CompressionType::Lz4; } } if args.uniform { - for mut c in &mut db_options.columns { + for c in &mut db_options.columns { c.uniform = true; } } diff --git a/src/column.rs b/src/column.rs index 53521cfd..36c1ebb8 100644 --- a/src/column.rs +++ b/src/column.rs @@ -27,7 +27,7 @@ use std::{ }, }; -const MIN_INDEX_BITS: u8 = 16; +pub const MIN_INDEX_BITS: u8 = 16; // Measured in index entries const MAX_REINDEX_BATCH: usize = 8192; diff --git a/src/file.rs b/src/file.rs index 0d886d6c..34c142fa 100644 --- a/src/file.rs +++ b/src/file.rs @@ -4,12 +4,19 @@ //! Utilities for db file. use crate::{ - error::{try_io, Error, Result}, - parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}, + error::{try_io, Result}, + parking_lot::RwLock, table::TableId, }; use std::sync::atomic::{AtomicU64, Ordering}; +#[cfg(not(test))] +const RESERVE_ADDRESS_SPACE: usize = 1024 * 1024 * 1024; // 1 Gb + +// Use different value for tests to work around docker limits on the test machine. +#[cfg(test)] +const RESERVE_ADDRESS_SPACE: usize = 64 * 1024 * 1024; // 64 Mb + #[cfg(target_os = "linux")] fn disable_read_ahead(file: &std::fs::File) -> std::io::Result<()> { use std::os::unix::io::AsRawFd; @@ -36,42 +43,34 @@ fn disable_read_ahead(_file: &std::fs::File) -> std::io::Result<()> { Ok(()) } -// `File::sync_data` uses F_FULLSYNC fcntl on MacOS. It it supposed to be -// the safest way to make sure data is fully persisted. However starting from -// MacOS 11.0 it severely degrades parallel write performance, even when writing to -// other files. Regular `fsync` is good enough for our use case. -// SSDs used in modern macs seem to be able to flush data even on unexpected power loss. -// We performed some testing with power shutdowns and kernel panics on both mac hardware -// and VMs and in all cases `fsync` was enough to prevent data corruption. -#[cfg(target_os = "macos")] -fn fsync(file: &std::fs::File) -> std::io::Result<()> { - use std::os::unix::io::AsRawFd; - if unsafe { libc::fsync(file.as_raw_fd()) } != 0 { - Err(std::io::Error::last_os_error()) - } else { - Ok(()) +#[cfg(unix)] +pub fn madvise_random(map: &mut memmap2::MmapMut) { + unsafe { + libc::madvise(map.as_mut_ptr() as _, map.len(), libc::MADV_RANDOM); } } -#[cfg(not(target_os = "macos"))] -fn fsync(file: &std::fs::File) -> std::io::Result<()> { - file.sync_data() -} +#[cfg(not(unix))] +pub fn madvise_random(_id: TableId, _map: &mut memmap2::MmapMut) {} const GROW_SIZE_BYTES: u64 = 256 * 1024; #[derive(Debug)] pub struct TableFile { - pub file: RwLock>, + pub map: RwLock>, pub path: std::path::PathBuf, pub capacity: AtomicU64, pub id: TableId, } +fn map_len(file_len: u64) -> usize { + file_len as usize + RESERVE_ADDRESS_SPACE +} + impl TableFile { pub fn open(filepath: std::path::PathBuf, entry_size: u16, id: TableId) -> Result { let mut capacity = 0u64; - let file = if std::fs::metadata(&filepath).is_ok() { + let map = if std::fs::metadata(&filepath).is_ok() { let file = try_io!(std::fs::OpenOptions::new() .read(true) .write(true) @@ -81,17 +80,20 @@ impl TableFile { if len == 0 { // Preallocate. capacity += GROW_SIZE_BYTES / entry_size as u64; - try_io!(file.set_len(capacity * entry_size as u64)); + try_io!(file.set_len(GROW_SIZE_BYTES)); } else { capacity = len / entry_size as u64; } - Some(file) + let mut map = + try_io!(unsafe { memmap2::MmapOptions::new().len(map_len(len)).map_mut(&file) }); + madvise_random(&mut map); + Some((map, file)) } else { None }; Ok(TableFile { path: filepath, - file: RwLock::new(file), + map: RwLock::new(map), capacity: AtomicU64::new(capacity), id, }) @@ -108,111 +110,87 @@ impl TableFile { Ok(file) } - #[cfg(unix)] pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<()> { - use std::os::unix::fs::FileExt; - try_io!(self - .file - .read() - .as_ref() - .ok_or_else(|| Error::Corruption("File does not exist.".into()))? - .read_exact_at(buf, offset)); - Ok(()) - } - - #[cfg(unix)] - pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<()> { - use std::os::unix::fs::FileExt; - try_io!(self.file.read().as_ref().unwrap().write_all_at(buf, offset)); + let offset = offset as usize; + let map = self.map.read(); + let (map, _) = map.as_ref().unwrap(); + buf.copy_from_slice(&map[offset..offset + buf.len()]); Ok(()) } - #[cfg(windows)] - pub fn read_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<()> { - use crate::error::Error; - use std::{io, os::windows::fs::FileExt}; - - let file = self.file.read(); - let file = file.as_ref().ok_or_else(|| Error::Corruption("File does not exist.".into()))?; - - while !buf.is_empty() { - match file.seek_read(buf, offset) { - Ok(0) => break, - Ok(n) => { - buf = &mut buf[n..]; - offset += n as u64; - }, - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => { - // Try again - }, - Err(e) => return Err(Error::Io(e)), - } - } - - if !buf.is_empty() { - Err(Error::Io(io::Error::new( - io::ErrorKind::UnexpectedEof, - "failed to fill whole buffer", - ))) - } else { - Ok(()) - } + pub fn slice_at(&self, offset: u64, len: usize) -> &[u8] { + let offset = offset as usize; + let map = self.map.read(); + let (map, _) = map.as_ref().unwrap(); + let data: &[u8] = unsafe { + let ptr = map.as_ptr().add(offset); + std::slice::from_raw_parts(ptr, len) + }; + data } - #[cfg(windows)] - pub fn write_at(&self, mut buf: &[u8], mut offset: u64) -> Result<()> { - use crate::error::Error; - use std::{io, os::windows::fs::FileExt}; - - let file = self.file.read(); - let file = file.as_ref().unwrap(); - - while !buf.is_empty() { - match file.seek_write(buf, offset) { - Ok(0) => - return Err(Error::Io(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write whole buffer", - ))), - Ok(n) => { - buf = &buf[n..]; - offset += n as u64; - }, - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => { - // Try again - }, - Err(e) => return Err(Error::Io(e)), - } - } - + pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<()> { + let map = self.map.read(); + let (map, _) = map.as_ref().unwrap(); + let offset = offset as usize; + + // Nasty mutable pointer cast. We do ensure that all chunks that are being written are + // accessed through the overlay in other threads. + let ptr: *mut u8 = map.as_ptr() as *mut u8; + let data: &mut [u8] = unsafe { + let ptr = ptr.add(offset); + std::slice::from_raw_parts_mut(ptr, buf.len()) + }; + data.copy_from_slice(buf); Ok(()) } pub fn grow(&self, entry_size: u16) -> Result<()> { - let mut capacity = self.capacity.load(Ordering::Relaxed); - capacity += GROW_SIZE_BYTES / entry_size as u64; - + let mut map_and_file = self.map.write(); + let new_len = match map_and_file.as_mut() { + None => { + let file = self.create_file()?; + let len = GROW_SIZE_BYTES; + try_io!(file.set_len(len)); + let mut map = try_io!(unsafe { + memmap2::MmapOptions::new().len(RESERVE_ADDRESS_SPACE).map_mut(&file) + }); + madvise_random(&mut map); + *map_and_file = Some((map, file)); + len + }, + Some((map, file)) => { + let new_len = try_io!(file.metadata()).len() + GROW_SIZE_BYTES; + try_io!(file.set_len(new_len)); + if map.len() < new_len as usize { + let mut new_map = try_io!(unsafe { + memmap2::MmapOptions::new().len(map_len(new_len)).map_mut(&*file) + }); + madvise_random(&mut new_map); + let old_map = std::mem::replace(map, new_map); + try_io!(old_map.flush()); + // Leak the old mapping as there might be concurrent readers. + std::mem::forget(old_map); + } + new_len + }, + }; + let capacity = new_len / entry_size as u64; self.capacity.store(capacity, Ordering::Relaxed); - let mut file = self.file.upgradable_read(); - if file.is_none() { - let mut wfile = RwLockUpgradableReadGuard::upgrade(file); - *wfile = Some(self.create_file()?); - file = RwLockWriteGuard::downgrade_to_upgradable(wfile); - } - try_io!(file.as_ref().unwrap().set_len(capacity * entry_size as u64)); Ok(()) } pub fn flush(&self) -> Result<()> { - if let Some(file) = self.file.read().as_ref() { - try_io!(fsync(file)); + if let Some((map, _)) = self.map.read().as_ref() { + try_io!(map.flush()); } Ok(()) } pub fn remove(&self) -> Result<()> { - let mut file = self.file.write(); - if let Some(file) = file.take() { + let mut map = self.map.write(); + if let Some((map, file)) = map.take() { + drop(map); drop(file); try_io!(std::fs::remove_file(&self.path)); } diff --git a/src/index.rs b/src/index.rs index 5cec3318..a022f1a2 100644 --- a/src/index.rs +++ b/src/index.rs @@ -2,9 +2,10 @@ // This file is dual-licensed as Apache-2.0 or MIT. use crate::{ - column::ColId, + column::{ColId, MIN_INDEX_BITS}, display::hex, error::{try_io, Error, Result}, + file::madvise_random, log::{LogQuery, LogReader, LogWriter}, parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}, stats::{self, ColumnStats}, @@ -142,20 +143,6 @@ fn file_size(index_bits: u8) -> u64 { total_entries(index_bits) * 8 + META_SIZE as u64 } -#[cfg(unix)] -fn madvise_random(id: TableId, map: &mut memmap2::MmapMut) { - unsafe { - libc::madvise( - map.as_mut_ptr() as _, - file_size(id.index_bits()) as usize, - libc::MADV_RANDOM, - ); - } -} - -#[cfg(not(unix))] -fn madvise_random(_id: TableId, _map: &mut memmap2::MmapMut) {} - #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] pub struct TableId(u16); @@ -195,6 +182,20 @@ impl TableId { pub fn total_entries(&self) -> u64 { total_entries(self.index_bits()) } + + pub fn log_index(&self) -> usize { + self.col() as usize * (64 - MIN_INDEX_BITS) as usize + self.index_bits() as usize + } + + pub fn from_log_index(i: usize) -> Self { + let col = i / (64 - MIN_INDEX_BITS) as usize; + let bits = i % (64 - MIN_INDEX_BITS) as usize; + TableId::new(col as ColId, bits as u8) + } + + pub const fn max_log_indicies(num_columns: usize) -> usize { + (64 - MIN_INDEX_BITS) as usize * num_columns + } } impl std::fmt::Display for TableId { @@ -216,7 +217,7 @@ impl IndexTable { try_io!(file.set_len(file_size(id.index_bits()))); let mut map = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); - madvise_random(id, &mut map); + madvise_random(&mut map); log::debug!(target: "parity-db", "Opened existing index {}", id); Ok(Some(IndexTable { id, path, map: RwLock::new(Some(map)) })) } @@ -564,7 +565,7 @@ impl IndexTable { log::debug!(target: "parity-db", "Created new index {}", self.id); try_io!(file.set_len(file_size(self.id.index_bits()))); let mut mmap = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); - madvise_random(self.id, &mut mmap); + madvise_random(&mut mmap); *wmap = Some(mmap); map = RwLockWriteGuard::downgrade_to_upgradable(wmap); } @@ -641,7 +642,6 @@ mod test { use super::*; use rand::{Rng, SeedableRng}; use std::path::PathBuf; - #[cfg(feature = "bench")] use test::Bencher; #[cfg(feature = "bench")] diff --git a/src/log.rs b/src/log.rs index 19e54ba9..882b7ea3 100644 --- a/src/log.rs +++ b/src/log.rs @@ -24,6 +24,13 @@ const INSERT_VALUE: u8 = 3; const END_RECORD: u8 = 4; const DROP_TABLE: u8 = 5; +// Once index overly uses less than 1/10 of its capacity, it will be reclaimed. +const INDEX_OVERLAY_RECLAIM_FACTOR: usize = 10; +// Once value overly uses less than 1/10 of its capacity, it will be reclaimed. +const VALUE_OVERLAY_RECLAIM_FACTOR: usize = 10; +// Min number of value items to initiate reclaim. Each item is around 40 bytes. +const VALUE_OVERLAY_MIN_RECLAIM_ITEMS: usize = 10240; + #[derive(Debug)] pub struct InsertIndexAction { pub table: IndexTableId, @@ -46,6 +53,10 @@ pub enum LogAction { } pub trait LogQuery { + type ValueRef<'a>: std::ops::Deref + where + Self: 'a; + fn with_index R>( &self, table: IndexTableId, @@ -53,37 +64,91 @@ pub trait LogQuery { f: F, ) -> Option; fn value(&self, table: ValueTableId, index: u64, dest: &mut [u8]) -> bool; + fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option>; } -#[derive(Default, Debug)] +#[derive(Debug)] pub struct LogOverlays { - index: HashMap, - value: HashMap, - last_record_id: HashMap, + index: Vec, + value: Vec, + last_record_ids: Vec, } impl LogOverlays { pub fn last_record_id(&self, col: ColId) -> u64 { - self.last_record_id.get(&col).cloned().unwrap_or(u64::MAX) + self.last_record_ids.get(col as usize).cloned().unwrap_or(u64::MAX) + } + + pub fn with_columns(count: usize) -> Self { + Self { + index: (0..IndexTableId::max_log_indicies(count)) + .map(|_| IndexLogOverlay::default()) + .collect(), + value: (0..ValueTableId::max_log_tables(count)) + .map(|_| ValueLogOverlay::default()) + .collect(), + last_record_ids: (0..count).map(|_| 0).collect(), + } } } +// Loom is missing support for guard projection, so we copy the data as a workaround. +#[cfg(feature = "loom")] +pub struct MappedBytesGuard<'a> { + _phantom: std::marker::PhantomData<&'a ()>, + data: Vec, +} + +#[cfg(feature = "loom")] +impl<'a> MappedBytesGuard<'a> { + fn new(data: Vec) -> Self { + Self { _phantom: std::marker::PhantomData, data } + } +} + +#[cfg(feature = "loom")] +impl<'a> std::ops::Deref for MappedBytesGuard<'a> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.data.as_slice() + } +} + +#[cfg(not(feature = "loom"))] +type MappedBytesGuard<'a> = parking_lot::MappedRwLockReadGuard<'a, [u8]>; + impl LogQuery for RwLock { + type ValueRef<'a> = MappedBytesGuard<'a>; + fn with_index R>( &self, table: IndexTableId, index: u64, f: F, ) -> Option { - self.read().with_index(table, index, f) + (&*self.read()).with_index(table, index, f) } fn value(&self, table: ValueTableId, index: u64, dest: &mut [u8]) -> bool { - self.read().value(table, index, dest) + (&*self.read()).value(table, index, dest) + } + + #[cfg(not(feature = "loom"))] + fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option> { + let lock = + parking_lot::RwLockReadGuard::try_map(self.read(), |o| o.value_ref(table, index)); + lock.ok() + } + + #[cfg(feature = "loom")] + fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option> { + self.read().value_ref(table, index).map(|o| MappedBytesGuard::new(o.to_vec())) } } impl LogQuery for LogOverlays { + type ValueRef<'a> = &'a [u8]; fn with_index R>( &self, table: IndexTableId, @@ -91,13 +156,16 @@ impl LogQuery for LogOverlays { f: F, ) -> Option { self.index - .get(&table) + .get(table.log_index()) .and_then(|o| o.map.get(&index).map(|(_id, _mask, data)| f(data))) } fn value(&self, table: ValueTableId, index: u64, dest: &mut [u8]) -> bool { let s = self; - if let Some(d) = s.value.get(&table).and_then(|o| o.map.get(&index).map(|(_id, data)| data)) + if let Some(d) = s + .value + .get(table.log_index()) + .and_then(|o| o.map.get(&index).map(|(_id, data)| data)) { let len = dest.len().min(d.len()); dest[0..len].copy_from_slice(&d[0..len]); @@ -106,6 +174,11 @@ impl LogQuery for LogOverlays { false } } + fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option> { + self.value + .get(table.log_index()) + .and_then(|o| o.map.get(&index).map(|(_id, data)| data.as_slice())) + } } #[derive(Debug, Default)] @@ -241,7 +314,7 @@ impl<'a> LogReader<'a> { #[derive(Debug)] pub struct LogChange { local_index: HashMap, - local_values: HashMap, + local_values: HashMap, record_id: u64, dropped_tables: Vec, } @@ -256,7 +329,7 @@ impl LogChange { } } - pub fn local_values_changes(&self, id: ValueTableId) -> Option<&ValueLogOverlay> { + pub fn local_values_changes(&self, id: ValueTableId) -> Option<&ValueLogOverlayLocal> { self.local_values.get(&id) } @@ -313,7 +386,7 @@ impl LogChange { #[derive(Debug)] struct FlushedLog { index: HashMap, - values: HashMap, + values: HashMap, bytes: u64, } @@ -361,7 +434,23 @@ impl<'a> LogWriter<'a> { } } -impl<'a> LogQuery for LogWriter<'a> { +pub enum LogWriterValueGuard<'a> { + Local(&'a [u8]), + Overlay(MappedBytesGuard<'a>), +} + +impl std::ops::Deref for LogWriterValueGuard<'_> { + type Target = [u8]; + fn deref(&self) -> &[u8] { + match self { + LogWriterValueGuard::Local(data) => data, + LogWriterValueGuard::Overlay(data) => data.deref(), + } + } +} + +impl<'q> LogQuery for LogWriter<'q> { + type ValueRef<'a> = LogWriterValueGuard<'a> where Self: 'a; fn with_index R>( &self, table: IndexTableId, @@ -393,6 +482,19 @@ impl<'a> LogQuery for LogWriter<'a> { self.overlays.value(table, index, dest) } } + fn value_ref<'v>(&'v self, table: ValueTableId, index: u64) -> Option> { + self.log + .local_values + .get(&table) + .and_then(|o| { + o.map.get(&index).map(|(_id, data)| LogWriterValueGuard::Local(data.as_slice())) + }) + .or_else(|| { + self.overlays + .value_ref(table, index) + .map(|data| LogWriterValueGuard::Overlay(data)) + }) + } } // Identity hash. @@ -447,6 +549,10 @@ pub struct IndexLogOverlay { // We use identity hash for value overlay/log records so that writes to value tables are in order. #[derive(Debug, Default)] pub struct ValueLogOverlay { + pub map: HashMap)>, // index -> (record_id, entry) +} +#[derive(Debug, Default)] +pub struct ValueLogOverlayLocal { pub map: HashMap), BuildIdHash>, // index -> (record_id, entry) } @@ -510,7 +616,7 @@ impl Log { let next_log_id = if logs.is_empty() { 0 } else { max_log_id + 1 }; Ok(Log { - overlays: Default::default(), + overlays: RwLock::new(LogOverlays::with_columns(options.columns.len())), appending: RwLock::new(None), reading: RwLock::new(None), read_queue: RwLock::default(), @@ -575,9 +681,15 @@ impl Log { self.cleanup_queue.write().push_back((id, file)); } let mut overlays = self.overlays.write(); - overlays.index.clear(); - overlays.value.clear(); - overlays.last_record_id.clear(); + for o in overlays.index.iter_mut() { + o.map.clear(); + } + for o in overlays.value.iter_mut() { + o.map.clear(); + } + for r in overlays.last_record_ids.iter_mut() { + *r = 0; + } self.dirty.store(false, Ordering::Relaxed); } @@ -615,13 +727,13 @@ impl Log { let mut total_index = 0; for (id, overlay) in index.into_iter() { total_index += overlay.map.len(); - overlays.index.entry(id).or_default().map.extend(overlay.map.into_iter()); + overlays.index[id.log_index()].map.extend(overlay.map.into_iter()); } let mut total_value = 0; for (id, overlay) in values.into_iter() { total_value += overlay.map.len(); - overlays.last_record_id.insert(id.col(), record_id); - overlays.value.entry(id).or_default().map.extend(overlay.map.into_iter()); + overlays.last_record_ids[id.col() as usize] = record_id; + overlays.value[id.log_index()].map.extend(overlay.map.into_iter()); } log::debug!( @@ -642,7 +754,7 @@ impl Log { } let mut overlays = self.overlays.write(); for (table, index) in cleared.index.into_iter() { - if let Some(ref mut overlay) = overlays.index.get_mut(&table) { + if let Some(ref mut overlay) = overlays.index.get_mut(table.log_index()) { if let std::collections::hash_map::Entry::Occupied(e) = overlay.map.entry(index) { if e.get().0 == record_id { e.remove_entry(); @@ -651,7 +763,7 @@ impl Log { } } for (table, index) in cleared.values.into_iter() { - if let Some(ref mut overlay) = overlays.value.get_mut(&table) { + if let Some(ref mut overlay) = overlays.value.get_mut(table.log_index()) { if let std::collections::hash_map::Entry::Occupied(e) = overlay.map.entry(index) { if e.get().0 == record_id { e.remove_entry(); @@ -659,8 +771,31 @@ impl Log { } } } - // Cleanup index overlays - overlays.index.retain(|_, overlay| !overlay.map.is_empty()); + // Reclaim overlay memory + for (i, o) in overlays.index.iter_mut().enumerate() { + if o.map.capacity() > o.map.len() * INDEX_OVERLAY_RECLAIM_FACTOR { + log::trace!( + "Schrinking index overlay {}: {}/{}", + IndexTableId::from_log_index(i), + o.map.len(), + o.map.capacity(), + ); + o.map.shrink_to_fit(); + } + } + for (i, o) in overlays.value.iter_mut().enumerate() { + if o.map.capacity() > VALUE_OVERLAY_MIN_RECLAIM_ITEMS && + o.map.capacity() > o.map.len() * VALUE_OVERLAY_RECLAIM_FACTOR + { + log::trace!( + "Schrinking value overlay {}: {}/{}", + ValueTableId::from_log_index(i), + o.map.len(), + o.map.capacity(), + ); + o.map.shrink_to_fit(); + } + } } pub fn flush_one(&self, min_size: u64) -> Result { diff --git a/src/table.rs b/src/table.rs index f81113cc..c75ebec0 100644 --- a/src/table.rs +++ b/src/table.rs @@ -47,15 +47,14 @@ use crate::{ column::ColId, display::hex, - error::{try_io, Result}, - log::{LogQuery, LogReader, LogWriter}, + error::Result, + log::{LogOverlays, LogQuery, LogReader, LogWriter}, options::ColumnOptions as Options, parking_lot::RwLock, table::key::{TableKey, TableKeyQuery, PARTIAL_SIZE}, }; use std::{ convert::TryInto, - io::Read, mem::MaybeUninit, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -117,6 +116,20 @@ impl TableId { pub fn as_u16(&self) -> u16 { self.0 } + + pub fn log_index(&self) -> usize { + self.col() as usize * SIZE_TIERS + self.size_tier() as usize + } + + pub const fn max_log_tables(num_columns: usize) -> usize { + SIZE_TIERS * num_columns + } + + pub fn from_log_index(i: usize) -> Self { + let col = i / SIZE_TIERS; + let tier = i % SIZE_TIERS; + Self::new(col as ColId, tier as u8) + } } impl std::fmt::Display for TableId { @@ -156,11 +169,12 @@ impl Header { } } -pub struct Entry + AsMut<[u8]>>(usize, B); +pub struct Entry>(usize, B); #[cfg(feature = "loom")] pub type FullEntry = Entry>; #[cfg(not(feature = "loom"))] pub type FullEntry = Entry<[u8; MAX_ENTRY_BUF_SIZE]>; +pub type EntryRef<'a> = Entry<&'a [u8]>; type PartialEntry = Entry<[u8; 10]>; type PartialKeyEntry = Entry<[u8; 40]>; // 2 + 4 + 26 + 8 @@ -186,7 +200,7 @@ impl Entry<[u8; MAX_ENTRY_BUF_SIZE]> { } } -impl + AsMut<[u8]>> Entry { +impl> Entry { #[inline(always)] pub fn check_remaining_len( &self, @@ -212,12 +226,6 @@ impl + AsMut<[u8]>> Entry { self.0 } - pub fn write_slice(&mut self, buf: &[u8]) { - let start = self.0; - self.0 += buf.len(); - self.1.as_mut()[start..self.0].copy_from_slice(buf); - } - pub fn read_slice(&mut self, size: usize) -> &[u8] { let start = self.0; self.0 += size; @@ -228,10 +236,6 @@ impl + AsMut<[u8]>> Entry { &self.1.as_ref()[0..SIZE_SIZE] == TOMBSTONE } - fn write_tombstone(&mut self) { - self.write_slice(TOMBSTONE); - } - fn is_multipart(&self) -> bool { &self.1.as_ref()[0..SIZE_SIZE] == MULTIPART } @@ -240,10 +244,6 @@ impl + AsMut<[u8]>> Entry { &self.1.as_ref()[0..SIZE_SIZE] == MULTIPART_V4 } - fn write_multipart(&mut self) { - self.write_slice(MULTIPART); - } - fn is_multihead_compressed(&self) -> bool { &self.1.as_ref()[0..SIZE_SIZE] == MULTIHEAD_COMPRESSED } @@ -256,14 +256,6 @@ impl + AsMut<[u8]>> Entry { &self.1.as_ref()[0..SIZE_SIZE] == MULTIHEAD_V4 } - fn write_multihead(&mut self) { - self.write_slice(MULTIHEAD); - } - - fn write_multihead_compressed(&mut self) { - self.write_slice(MULTIHEAD_COMPRESSED); - } - fn is_multi(&self, db_version: u32) -> bool { self.is_multipart() || self.is_multihead() || @@ -280,13 +272,6 @@ impl + AsMut<[u8]>> Entry { self.0 += SIZE_SIZE; } - fn write_size(&mut self, mut size: u16, compressed: bool) { - if compressed { - size |= COMPRESSED_MASK; - } - self.write_slice(&size.to_le_bytes()); - } - pub fn read_u64(&mut self) -> u64 { u64::from_le_bytes(self.read_slice(8).try_into().unwrap()) } @@ -303,30 +288,14 @@ impl + AsMut<[u8]>> Entry { self.skip_u64() } - pub fn write_u64(&mut self, next_index: u64) { - self.write_slice(&next_index.to_le_bytes()); - } - - fn write_next(&mut self, next_index: u64) { - self.write_u64(next_index) - } - pub fn read_u32(&mut self) -> u32 { u32::from_le_bytes(self.read_slice(REFS_SIZE).try_into().unwrap()) } - pub fn write_u32(&mut self, next_index: u32) { - self.write_slice(&next_index.to_le_bytes()); - } - fn read_rc(&mut self) -> u32 { self.read_u32() } - fn write_rc(&mut self, rc: u32) { - self.write_slice(&rc.to_le_bytes()); - } - fn read_partial(&mut self) -> &[u8] { self.read_slice(PARTIAL_SIZE) } @@ -334,6 +303,52 @@ impl + AsMut<[u8]>> Entry { fn remaining_to(&self, end: usize) -> &[u8] { &self.1.as_ref()[self.0..end] } +} + +impl + AsMut<[u8]>> Entry { + pub fn write_slice(&mut self, buf: &[u8]) { + let start = self.0; + self.0 += buf.len(); + self.1.as_mut()[start..self.0].copy_from_slice(buf); + } + + fn write_tombstone(&mut self) { + self.write_slice(TOMBSTONE); + } + + fn write_multipart(&mut self) { + self.write_slice(MULTIPART); + } + + fn write_multihead(&mut self) { + self.write_slice(MULTIHEAD); + } + + fn write_multihead_compressed(&mut self) { + self.write_slice(MULTIHEAD_COMPRESSED); + } + + fn write_size(&mut self, mut size: u16, compressed: bool) { + if compressed { + size |= COMPRESSED_MASK; + } + self.write_slice(&size.to_le_bytes()); + } + pub fn write_u64(&mut self, next_index: u64) { + self.write_slice(&next_index.to_le_bytes()); + } + + fn write_next(&mut self, next_index: u64) { + self.write_u64(next_index) + } + + pub fn write_u32(&mut self, next_index: u32) { + self.write_slice(&next_index.to_le_bytes()); + } + + fn write_rc(&mut self, rc: u32) { + self.write_slice(&rc.to_le_bytes()); + } pub fn inner_mut(&mut self) -> &mut B { &mut self.1 @@ -346,7 +361,7 @@ impl + AsMut<[u8]>> AsMut<[u8]> for Entry { } } -impl + AsMut<[u8]>> AsRef<[u8]> for Entry { +impl> AsRef<[u8]> for Entry { fn as_ref(&self) -> &[u8] { self.1.as_ref() } @@ -386,9 +401,9 @@ impl ValueTable { let file = crate::file::TableFile::open(filepath, entry_size, id)?; let mut filled = 1; let mut last_removed = 0; - if let Some(file) = &mut *file.file.write() { + if file.map.read().is_some() { let mut header = Header::default(); - try_io!(file.read_exact(&mut header.0)); + file.read_at(&mut header.0, 0)?; last_removed = header.last_removed(); filled = header.filled(); if filled == 0 { @@ -435,14 +450,20 @@ impl ValueTable { log: &impl LogQuery, mut f: impl FnMut(&[u8]) -> bool, ) -> Result<(u32, bool)> { - let mut buf = FullEntry::new_uninit_full_entry(); let mut part = 0; let mut compressed = false; let mut rc = 1; let entry_size = self.entry_size as usize; loop { - let buf = if log.value(self.id, index, buf.as_mut()) { - &mut buf + let vbuf = log.value_ref(self.id, index); + let buf: &[u8] = if let Some(buf) = vbuf.as_deref() { + log::trace!( + target: "parity-db", + "{}: Found in overlay {}", + self.id, + index, + ); + buf } else { log::trace!( target: "parity-db", @@ -450,9 +471,9 @@ impl ValueTable { self.id, index, ); - self.file.read_at(&mut buf[0..entry_size], index * self.entry_size as u64)?; - &mut buf + self.file.slice_at(index * self.entry_size as u64, entry_size) }; + let mut buf = EntryRef::new(buf); buf.set_offset(0); @@ -486,11 +507,11 @@ impl ValueTable { } match key { TableKeyQuery::Fetch(Some(to_fetch)) => { - **to_fetch = TableKey::fetch_partial(buf)?; + **to_fetch = TableKey::fetch_partial(&mut buf)?; }, TableKeyQuery::Fetch(None) => (), TableKeyQuery::Check(k) => { - let to_fetch = k.fetch(buf)?; + let to_fetch = k.fetch(&mut buf)?; if !k.compare(&to_fetch) { log::debug!( target: "parity-db", @@ -959,7 +980,7 @@ impl ValueTable { } pub fn refresh_metadata(&self) -> Result<()> { - if self.file.file.read().is_none() { + if self.file.map.read().is_none() { return Ok(()) } let mut header = Header::default(); @@ -1033,7 +1054,7 @@ impl ValueTable { } pub fn is_init(&self) -> bool { - self.file.file.read().is_some() + self.file.map.read().is_some() } pub fn init_with_entry(&self, entry: &[u8]) -> Result<()> { @@ -1048,7 +1069,7 @@ impl ValueTable { fn do_init_with_entry(&self, entry: &[u8]) -> Result<()> { self.file.grow(self.entry_size)?; - let empty_overlays = RwLock::new(Default::default()); + let empty_overlays = RwLock::new(LogOverlays::with_columns(0)); let mut log = LogWriter::new(&empty_overlays, 0); let at = self.overwrite_chain(&TableKey::NoHash, entry, &mut log, None, false)?; self.complete_plan(&mut log)?; @@ -1084,7 +1105,7 @@ impl ValueTable { } pub mod key { - use super::FullEntry; + use super::{EntryRef, FullEntry}; use crate::{Key, Result}; pub const PARTIAL_SIZE: usize = 26; @@ -1118,7 +1139,7 @@ pub mod key { } } - pub fn fetch_partial(buf: &mut FullEntry) -> Result<[u8; PARTIAL_SIZE]> { + pub fn fetch_partial<'a>(buf: &mut EntryRef<'a>) -> Result<[u8; PARTIAL_SIZE]> { let mut result = [0u8; PARTIAL_SIZE]; if buf.1.len() >= PARTIAL_SIZE { let pks = buf.read_partial(); @@ -1128,7 +1149,7 @@ pub mod key { Err(crate::error::Error::InvalidValueData) } - pub fn fetch(&self, buf: &mut FullEntry) -> Result> { + pub fn fetch<'a>(&self, buf: &mut EntryRef<'a>) -> Result> { match self { TableKey::Partial(_k) => Ok(Some(Self::fetch_partial(buf)?)), TableKey::NoHash => Ok(None),