From cbe07fae943948a480743ae407a4113251931a8e Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 9 Jul 2023 23:07:23 +0200 Subject: [PATCH 01/17] Experimental mmap IO for tables --- src/file.rs | 130 +++++++++++++++++++++++---------------------------- src/table.rs | 11 ++--- 2 files changed, 64 insertions(+), 77 deletions(-) diff --git a/src/file.rs b/src/file.rs index 0d886d6c..213e0674 100644 --- a/src/file.rs +++ b/src/file.rs @@ -4,8 +4,8 @@ //! Utilities for db file. use crate::{ - error::{try_io, Error, Result}, - parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}, + error::{try_io, Result}, + parking_lot::RwLock, table::TableId, }; use std::sync::atomic::{AtomicU64, Ordering}; @@ -36,33 +36,25 @@ fn disable_read_ahead(_file: &std::fs::File) -> std::io::Result<()> { Ok(()) } -// `File::sync_data` uses F_FULLSYNC fcntl on MacOS. It it supposed to be -// the safest way to make sure data is fully persisted. However starting from -// MacOS 11.0 it severely degrades parallel write performance, even when writing to -// other files. Regular `fsync` is good enough for our use case. -// SSDs used in modern macs seem to be able to flush data even on unexpected power loss. -// We performed some testing with power shutdowns and kernel panics on both mac hardware -// and VMs and in all cases `fsync` was enough to prevent data corruption. -#[cfg(target_os = "macos")] -fn fsync(file: &std::fs::File) -> std::io::Result<()> { - use std::os::unix::io::AsRawFd; - if unsafe { libc::fsync(file.as_raw_fd()) } != 0 { - Err(std::io::Error::last_os_error()) - } else { - Ok(()) +#[cfg(unix)] +fn madvise_random(map: &mut memmap2::MmapMut) { + unsafe { + libc::madvise( + map.as_mut_ptr() as _, + map.len(), + libc::MADV_RANDOM, + ); } } -#[cfg(not(target_os = "macos"))] -fn fsync(file: &std::fs::File) -> std::io::Result<()> { - file.sync_data() -} +#[cfg(not(unix))] +fn madvise_random(_id: TableId, _map: &mut memmap2::MmapMut) {} const GROW_SIZE_BYTES: u64 = 256 * 1024; #[derive(Debug)] pub struct TableFile { - pub file: RwLock>, + pub map: RwLock>, pub path: std::path::PathBuf, pub capacity: AtomicU64, pub id: TableId, @@ -71,7 +63,7 @@ pub struct TableFile { impl TableFile { pub fn open(filepath: std::path::PathBuf, entry_size: u16, id: TableId) -> Result { let mut capacity = 0u64; - let file = if std::fs::metadata(&filepath).is_ok() { + let map = if std::fs::metadata(&filepath).is_ok() { let file = try_io!(std::fs::OpenOptions::new() .read(true) .write(true) @@ -85,13 +77,15 @@ impl TableFile { } else { capacity = len / entry_size as u64; } - Some(file) + let mut map = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); + madvise_random(&mut map); + Some((map, file)) } else { None }; Ok(TableFile { path: filepath, - file: RwLock::new(file), + map: RwLock::new(map), capacity: AtomicU64::new(capacity), id, }) @@ -110,53 +104,37 @@ impl TableFile { #[cfg(unix)] pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<()> { - use std::os::unix::fs::FileExt; - try_io!(self - .file - .read() - .as_ref() - .ok_or_else(|| Error::Corruption("File does not exist.".into()))? - .read_exact_at(buf, offset)); + let offset = offset as usize; + let map = self.map.read(); + let (map, _) = map.as_ref().unwrap(); + buf.copy_from_slice(&map[offset..offset + buf.len()]); Ok(()) } #[cfg(unix)] pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<()> { - use std::os::unix::fs::FileExt; - try_io!(self.file.read().as_ref().unwrap().write_all_at(buf, offset)); + let map = self.map.read(); + let (map, _) = map.as_ref().unwrap(); + let offset = offset as usize; + + // Nasty mutable pointer cast. We do ensure that all chunks that are being written are + // accessed through the overlay in other threads. + let ptr: *mut u8 = map.as_ptr() as *mut u8; + let data: &mut [u8] = unsafe { + let ptr = ptr.add(offset); + std::slice::from_raw_parts_mut(ptr, buf.len()) + }; + data.copy_from_slice(buf); Ok(()) } #[cfg(windows)] pub fn read_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<()> { - use crate::error::Error; - use std::{io, os::windows::fs::FileExt}; - - let file = self.file.read(); - let file = file.as_ref().ok_or_else(|| Error::Corruption("File does not exist.".into()))?; - - while !buf.is_empty() { - match file.seek_read(buf, offset) { - Ok(0) => break, - Ok(n) => { - buf = &mut buf[n..]; - offset += n as u64; - }, - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => { - // Try again - }, - Err(e) => return Err(Error::Io(e)), - } - } - - if !buf.is_empty() { - Err(Error::Io(io::Error::new( - io::ErrorKind::UnexpectedEof, - "failed to fill whole buffer", - ))) - } else { - Ok(()) - } + let (map, _) = self.map.read().as_ref().unwrap(); + let map = map.as_mut().unwrap(); + let offset = offset as usize; + map[offset..offset + buf.len()].copy_from_slice(buf); + Ok(()) } #[cfg(windows)] @@ -193,26 +171,36 @@ impl TableFile { capacity += GROW_SIZE_BYTES / entry_size as u64; self.capacity.store(capacity, Ordering::Relaxed); - let mut file = self.file.upgradable_read(); - if file.is_none() { - let mut wfile = RwLockUpgradableReadGuard::upgrade(file); - *wfile = Some(self.create_file()?); - file = RwLockWriteGuard::downgrade_to_upgradable(wfile); + let mut map_and_file = self.map.write(); + match map_and_file.as_mut() { + None => { + let file = self.create_file()?; + try_io!(file.set_len(capacity * entry_size as u64)); + let mut map = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); + madvise_random(&mut map); + *map_and_file = Some((map, file)); + } + Some((map, file)) => { + try_io!(file.set_len(capacity * entry_size as u64)); + let mut m = try_io!(unsafe { memmap2::MmapMut::map_mut(&*file) }); + madvise_random(&mut m); + *map = m; + } } - try_io!(file.as_ref().unwrap().set_len(capacity * entry_size as u64)); Ok(()) } pub fn flush(&self) -> Result<()> { - if let Some(file) = self.file.read().as_ref() { - try_io!(fsync(file)); + if let Some((map, _)) = self.map.read().as_ref() { + try_io!(map.flush()); } Ok(()) } pub fn remove(&self) -> Result<()> { - let mut file = self.file.write(); - if let Some(file) = file.take() { + let mut map = self.map.write(); + if let Some((map, file)) = map.take() { + drop(map); drop(file); try_io!(std::fs::remove_file(&self.path)); } diff --git a/src/table.rs b/src/table.rs index f81113cc..4de1d13a 100644 --- a/src/table.rs +++ b/src/table.rs @@ -47,7 +47,7 @@ use crate::{ column::ColId, display::hex, - error::{try_io, Result}, + error::Result, log::{LogQuery, LogReader, LogWriter}, options::ColumnOptions as Options, parking_lot::RwLock, @@ -55,7 +55,6 @@ use crate::{ }; use std::{ convert::TryInto, - io::Read, mem::MaybeUninit, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -386,9 +385,9 @@ impl ValueTable { let file = crate::file::TableFile::open(filepath, entry_size, id)?; let mut filled = 1; let mut last_removed = 0; - if let Some(file) = &mut *file.file.write() { + if file.map.read().is_some() { let mut header = Header::default(); - try_io!(file.read_exact(&mut header.0)); + file.read_at(&mut header.0, 0)?; last_removed = header.last_removed(); filled = header.filled(); if filled == 0 { @@ -959,7 +958,7 @@ impl ValueTable { } pub fn refresh_metadata(&self) -> Result<()> { - if self.file.file.read().is_none() { + if self.file.map.read().is_none() { return Ok(()) } let mut header = Header::default(); @@ -1033,7 +1032,7 @@ impl ValueTable { } pub fn is_init(&self) -> bool { - self.file.file.read().is_some() + self.file.map.read().is_some() } pub fn init_with_entry(&self, entry: &[u8]) -> Result<()> { From 66af3459f71e19ebf3d7d3405b761a6254d081e8 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 17 Jul 2023 21:58:32 +0200 Subject: [PATCH 02/17] Eliminate extra copy on read --- admin/src/lib.rs | 4 +- src/file.rs | 55 +++++---------------- src/index.rs | 20 ++------ src/log.rs | 50 +++++++++++++++++-- src/table.rs | 124 +++++++++++++++++++++++++---------------------- 5 files changed, 130 insertions(+), 123 deletions(-) diff --git a/admin/src/lib.rs b/admin/src/lib.rs index 6236bd38..c15f0160 100644 --- a/admin/src/lib.rs +++ b/admin/src/lib.rs @@ -114,12 +114,12 @@ pub fn run() -> Result<(), String> { let mut db_options = options.clone(); if args.compress { - for mut c in &mut db_options.columns { + for c in &mut db_options.columns { c.compression = parity_db::CompressionType::Lz4; } } if args.uniform { - for mut c in &mut db_options.columns { + for c in &mut db_options.columns { c.uniform = true; } } diff --git a/src/file.rs b/src/file.rs index 213e0674..9f68df3d 100644 --- a/src/file.rs +++ b/src/file.rs @@ -37,7 +37,7 @@ fn disable_read_ahead(_file: &std::fs::File) -> std::io::Result<()> { } #[cfg(unix)] -fn madvise_random(map: &mut memmap2::MmapMut) { +pub fn madvise_random(map: &mut memmap2::MmapMut) { unsafe { libc::madvise( map.as_mut_ptr() as _, @@ -48,7 +48,7 @@ fn madvise_random(map: &mut memmap2::MmapMut) { } #[cfg(not(unix))] -fn madvise_random(_id: TableId, _map: &mut memmap2::MmapMut) {} +pub fn madvise_random(_id: TableId, _map: &mut memmap2::MmapMut) {} const GROW_SIZE_BYTES: u64 = 256 * 1024; @@ -102,7 +102,6 @@ impl TableFile { Ok(file) } - #[cfg(unix)] pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<()> { let offset = offset as usize; let map = self.map.read(); @@ -111,7 +110,17 @@ impl TableFile { Ok(()) } - #[cfg(unix)] + pub fn slice_at(&self, offset: u64, len: usize) -> &[u8] { + let offset = offset as usize; + let map = self.map.read(); + let (map, _) = map.as_ref().unwrap(); + let data: &[u8] = unsafe { + let ptr = map.as_ptr().add(offset); + std::slice::from_raw_parts(ptr, len) + }; + data + } + pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<()> { let map = self.map.read(); let (map, _) = map.as_ref().unwrap(); @@ -128,44 +137,6 @@ impl TableFile { Ok(()) } - #[cfg(windows)] - pub fn read_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<()> { - let (map, _) = self.map.read().as_ref().unwrap(); - let map = map.as_mut().unwrap(); - let offset = offset as usize; - map[offset..offset + buf.len()].copy_from_slice(buf); - Ok(()) - } - - #[cfg(windows)] - pub fn write_at(&self, mut buf: &[u8], mut offset: u64) -> Result<()> { - use crate::error::Error; - use std::{io, os::windows::fs::FileExt}; - - let file = self.file.read(); - let file = file.as_ref().unwrap(); - - while !buf.is_empty() { - match file.seek_write(buf, offset) { - Ok(0) => - return Err(Error::Io(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write whole buffer", - ))), - Ok(n) => { - buf = &buf[n..]; - offset += n as u64; - }, - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => { - // Try again - }, - Err(e) => return Err(Error::Io(e)), - } - } - - Ok(()) - } - pub fn grow(&self, entry_size: u16) -> Result<()> { let mut capacity = self.capacity.load(Ordering::Relaxed); capacity += GROW_SIZE_BYTES / entry_size as u64; diff --git a/src/index.rs b/src/index.rs index 5cec3318..5108141c 100644 --- a/src/index.rs +++ b/src/index.rs @@ -9,6 +9,7 @@ use crate::{ parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}, stats::{self, ColumnStats}, table::{key::TableKey, SIZE_TIERS_BITS}, + file::madvise_random, Key, }; #[cfg(target_arch = "x86")] @@ -142,20 +143,6 @@ fn file_size(index_bits: u8) -> u64 { total_entries(index_bits) * 8 + META_SIZE as u64 } -#[cfg(unix)] -fn madvise_random(id: TableId, map: &mut memmap2::MmapMut) { - unsafe { - libc::madvise( - map.as_mut_ptr() as _, - file_size(id.index_bits()) as usize, - libc::MADV_RANDOM, - ); - } -} - -#[cfg(not(unix))] -fn madvise_random(_id: TableId, _map: &mut memmap2::MmapMut) {} - #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] pub struct TableId(u16); @@ -216,7 +203,7 @@ impl IndexTable { try_io!(file.set_len(file_size(id.index_bits()))); let mut map = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); - madvise_random(id, &mut map); + madvise_random(&mut map); log::debug!(target: "parity-db", "Opened existing index {}", id); Ok(Some(IndexTable { id, path, map: RwLock::new(Some(map)) })) } @@ -564,7 +551,7 @@ impl IndexTable { log::debug!(target: "parity-db", "Created new index {}", self.id); try_io!(file.set_len(file_size(self.id.index_bits()))); let mut mmap = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); - madvise_random(self.id, &mut mmap); + madvise_random(&mut mmap); *wmap = Some(mmap); map = RwLockWriteGuard::downgrade_to_upgradable(wmap); } @@ -641,7 +628,6 @@ mod test { use super::*; use rand::{Rng, SeedableRng}; use std::path::PathBuf; - #[cfg(feature = "bench")] use test::Bencher; #[cfg(feature = "bench")] diff --git a/src/log.rs b/src/log.rs index 19e54ba9..6b057186 100644 --- a/src/log.rs +++ b/src/log.rs @@ -1,6 +1,8 @@ // Copyright 2021-2022 Parity Technologies (UK) Ltd. // This file is dual-licensed as Apache-2.0 or MIT. +use parking_lot::{RwLockReadGuard, MappedRwLockReadGuard}; + use crate::{ column::ColId, error::{try_io, Error, Result}, @@ -46,6 +48,8 @@ pub enum LogAction { } pub trait LogQuery { + type ValueRef<'a>: std::ops::Deref where Self: 'a; + fn with_index R>( &self, table: IndexTableId, @@ -53,6 +57,7 @@ pub trait LogQuery { f: F, ) -> Option; fn value(&self, table: ValueTableId, index: u64, dest: &mut [u8]) -> bool; + fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option>; } #[derive(Default, Debug)] @@ -69,21 +74,29 @@ impl LogOverlays { } impl LogQuery for RwLock { + type ValueRef<'a> = MappedRwLockReadGuard<'a, [u8]>; + fn with_index R>( &self, table: IndexTableId, index: u64, f: F, ) -> Option { - self.read().with_index(table, index, f) + (&*self.read()).with_index(table, index, f) } fn value(&self, table: ValueTableId, index: u64, dest: &mut [u8]) -> bool { - self.read().value(table, index, dest) + (&*self.read()).value(table, index, dest) + } + + fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option> { + let lock = RwLockReadGuard::try_map(self.read(), |o| o.value_ref(table, index)); + lock.ok() } } impl LogQuery for LogOverlays { + type ValueRef<'a> = &'a [u8]; fn with_index R>( &self, table: IndexTableId, @@ -106,6 +119,10 @@ impl LogQuery for LogOverlays { false } } + fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option> { + log::debug!(target: "parity-db", "Query overlay index {}, record {}, size={}/{}", table, index, self.value.len(), self.value.get(&table).map_or(0, |o| o.map.len())); + self.value.get(&table).and_then(|o| o.map.get(&index).map(|(_id, data)| data.as_slice())) + } } #[derive(Debug, Default)] @@ -361,7 +378,23 @@ impl<'a> LogWriter<'a> { } } -impl<'a> LogQuery for LogWriter<'a> { +pub enum LogWriterLock<'a> { + Local(&'a [u8]), + Overlay(MappedRwLockReadGuard<'a, [u8]>), +} + +impl std::ops::Deref for LogWriterLock<'_> { + type Target = [u8]; + fn deref(&self) -> &[u8] { + match self { + LogWriterLock::Local(data) => data, + LogWriterLock::Overlay(data) => data.deref(), + } + } +} + +impl<'q> LogQuery for LogWriter<'q> { + type ValueRef<'a> = LogWriterLock<'a> where Self: 'a; fn with_index R>( &self, table: IndexTableId, @@ -384,7 +417,7 @@ impl<'a> LogQuery for LogWriter<'a> { .log .local_values .get(&table) - .and_then(|o| o.map.get(&index).map(|(_id, data)| data)) + .and_then(|o| o.map.get(&index).map(|(_id, data)| data)) { let len = dest.len().min(d.len()); dest[0..len].copy_from_slice(&d[0..len]); @@ -393,6 +426,15 @@ impl<'a> LogQuery for LogWriter<'a> { self.overlays.value(table, index, dest) } } + fn value_ref<'v> (&'v self, table: ValueTableId, index: u64) -> Option> { + log::debug!(target: "parity-db", "Query local overlay index {}, record {}", table, index); + self + .log + .local_values + .get(&table) + .and_then(|o| o.map.get(&index).map(|(_id, data)| LogWriterLock::Local(data.as_slice()))) + .or_else(|| self.overlays.value_ref(table, index).map(|data| LogWriterLock::Overlay(data))) + } } // Identity hash. diff --git a/src/table.rs b/src/table.rs index 4de1d13a..73f9c638 100644 --- a/src/table.rs +++ b/src/table.rs @@ -155,11 +155,12 @@ impl Header { } } -pub struct Entry + AsMut<[u8]>>(usize, B); +pub struct Entry>(usize, B); #[cfg(feature = "loom")] pub type FullEntry = Entry>; #[cfg(not(feature = "loom"))] pub type FullEntry = Entry<[u8; MAX_ENTRY_BUF_SIZE]>; +pub type EntryRef<'a> = Entry<&'a [u8]>; type PartialEntry = Entry<[u8; 10]>; type PartialKeyEntry = Entry<[u8; 40]>; // 2 + 4 + 26 + 8 @@ -185,7 +186,7 @@ impl Entry<[u8; MAX_ENTRY_BUF_SIZE]> { } } -impl + AsMut<[u8]>> Entry { +impl> Entry { #[inline(always)] pub fn check_remaining_len( &self, @@ -211,12 +212,6 @@ impl + AsMut<[u8]>> Entry { self.0 } - pub fn write_slice(&mut self, buf: &[u8]) { - let start = self.0; - self.0 += buf.len(); - self.1.as_mut()[start..self.0].copy_from_slice(buf); - } - pub fn read_slice(&mut self, size: usize) -> &[u8] { let start = self.0; self.0 += size; @@ -227,10 +222,6 @@ impl + AsMut<[u8]>> Entry { &self.1.as_ref()[0..SIZE_SIZE] == TOMBSTONE } - fn write_tombstone(&mut self) { - self.write_slice(TOMBSTONE); - } - fn is_multipart(&self) -> bool { &self.1.as_ref()[0..SIZE_SIZE] == MULTIPART } @@ -239,10 +230,6 @@ impl + AsMut<[u8]>> Entry { &self.1.as_ref()[0..SIZE_SIZE] == MULTIPART_V4 } - fn write_multipart(&mut self) { - self.write_slice(MULTIPART); - } - fn is_multihead_compressed(&self) -> bool { &self.1.as_ref()[0..SIZE_SIZE] == MULTIHEAD_COMPRESSED } @@ -255,14 +242,6 @@ impl + AsMut<[u8]>> Entry { &self.1.as_ref()[0..SIZE_SIZE] == MULTIHEAD_V4 } - fn write_multihead(&mut self) { - self.write_slice(MULTIHEAD); - } - - fn write_multihead_compressed(&mut self) { - self.write_slice(MULTIHEAD_COMPRESSED); - } - fn is_multi(&self, db_version: u32) -> bool { self.is_multipart() || self.is_multihead() || @@ -279,13 +258,6 @@ impl + AsMut<[u8]>> Entry { self.0 += SIZE_SIZE; } - fn write_size(&mut self, mut size: u16, compressed: bool) { - if compressed { - size |= COMPRESSED_MASK; - } - self.write_slice(&size.to_le_bytes()); - } - pub fn read_u64(&mut self) -> u64 { u64::from_le_bytes(self.read_slice(8).try_into().unwrap()) } @@ -302,30 +274,14 @@ impl + AsMut<[u8]>> Entry { self.skip_u64() } - pub fn write_u64(&mut self, next_index: u64) { - self.write_slice(&next_index.to_le_bytes()); - } - - fn write_next(&mut self, next_index: u64) { - self.write_u64(next_index) - } - pub fn read_u32(&mut self) -> u32 { u32::from_le_bytes(self.read_slice(REFS_SIZE).try_into().unwrap()) } - pub fn write_u32(&mut self, next_index: u32) { - self.write_slice(&next_index.to_le_bytes()); - } - fn read_rc(&mut self) -> u32 { self.read_u32() } - fn write_rc(&mut self, rc: u32) { - self.write_slice(&rc.to_le_bytes()); - } - fn read_partial(&mut self) -> &[u8] { self.read_slice(PARTIAL_SIZE) } @@ -333,6 +289,52 @@ impl + AsMut<[u8]>> Entry { fn remaining_to(&self, end: usize) -> &[u8] { &self.1.as_ref()[self.0..end] } +} + +impl + AsMut<[u8]>> Entry { + pub fn write_slice(&mut self, buf: &[u8]) { + let start = self.0; + self.0 += buf.len(); + self.1.as_mut()[start..self.0].copy_from_slice(buf); + } + + fn write_tombstone(&mut self) { + self.write_slice(TOMBSTONE); + } + + fn write_multipart(&mut self) { + self.write_slice(MULTIPART); + } + + fn write_multihead(&mut self) { + self.write_slice(MULTIHEAD); + } + + fn write_multihead_compressed(&mut self) { + self.write_slice(MULTIHEAD_COMPRESSED); + } + + fn write_size(&mut self, mut size: u16, compressed: bool) { + if compressed { + size |= COMPRESSED_MASK; + } + self.write_slice(&size.to_le_bytes()); + } + pub fn write_u64(&mut self, next_index: u64) { + self.write_slice(&next_index.to_le_bytes()); + } + + fn write_next(&mut self, next_index: u64) { + self.write_u64(next_index) + } + + pub fn write_u32(&mut self, next_index: u32) { + self.write_slice(&next_index.to_le_bytes()); + } + + fn write_rc(&mut self, rc: u32) { + self.write_slice(&rc.to_le_bytes()); + } pub fn inner_mut(&mut self) -> &mut B { &mut self.1 @@ -345,7 +347,7 @@ impl + AsMut<[u8]>> AsMut<[u8]> for Entry { } } -impl + AsMut<[u8]>> AsRef<[u8]> for Entry { +impl> AsRef<[u8]> for Entry { fn as_ref(&self) -> &[u8] { self.1.as_ref() } @@ -434,14 +436,20 @@ impl ValueTable { log: &impl LogQuery, mut f: impl FnMut(&[u8]) -> bool, ) -> Result<(u32, bool)> { - let mut buf = FullEntry::new_uninit_full_entry(); let mut part = 0; let mut compressed = false; let mut rc = 1; let entry_size = self.entry_size as usize; loop { - let buf = if log.value(self.id, index, buf.as_mut()) { - &mut buf + let vbuf = log.value_ref(self.id, index); + let buf: &[u8] = if let Some(buf) = vbuf.as_deref() { + log::trace!( + target: "parity-db", + "{}: Found in overlay {}", + self.id, + index, + ); + buf } else { log::trace!( target: "parity-db", @@ -449,9 +457,9 @@ impl ValueTable { self.id, index, ); - self.file.read_at(&mut buf[0..entry_size], index * self.entry_size as u64)?; - &mut buf + self.file.slice_at(index * self.entry_size as u64, entry_size) }; + let mut buf = EntryRef::new(buf); buf.set_offset(0); @@ -485,11 +493,11 @@ impl ValueTable { } match key { TableKeyQuery::Fetch(Some(to_fetch)) => { - **to_fetch = TableKey::fetch_partial(buf)?; + **to_fetch = TableKey::fetch_partial(&mut buf)?; }, TableKeyQuery::Fetch(None) => (), TableKeyQuery::Check(k) => { - let to_fetch = k.fetch(buf)?; + let to_fetch = k.fetch(&mut buf)?; if !k.compare(&to_fetch) { log::debug!( target: "parity-db", @@ -1083,7 +1091,7 @@ impl ValueTable { } pub mod key { - use super::FullEntry; + use super::{FullEntry, EntryRef}; use crate::{Key, Result}; pub const PARTIAL_SIZE: usize = 26; @@ -1117,7 +1125,7 @@ pub mod key { } } - pub fn fetch_partial(buf: &mut FullEntry) -> Result<[u8; PARTIAL_SIZE]> { + pub fn fetch_partial<'a>(buf: &mut EntryRef<'a>) -> Result<[u8; PARTIAL_SIZE]> { let mut result = [0u8; PARTIAL_SIZE]; if buf.1.len() >= PARTIAL_SIZE { let pks = buf.read_partial(); @@ -1127,7 +1135,7 @@ pub mod key { Err(crate::error::Error::InvalidValueData) } - pub fn fetch(&self, buf: &mut FullEntry) -> Result> { + pub fn fetch<'a>(&self, buf: &mut EntryRef<'a>) -> Result> { match self { TableKey::Partial(_k) => Ok(Some(Self::fetch_partial(buf)?)), TableKey::NoHash => Ok(None), From 80eac43f40a7104de41d2a8fca144d3104d13023 Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 18 Jul 2023 11:38:51 +0200 Subject: [PATCH 03/17] Reclaim overlay mem --- src/column.rs | 2 +- src/file.rs | 12 ++--- src/index.rs | 12 ++++- src/log.rs | 130 ++++++++++++++++++++++++++++++++++++-------------- src/table.rs | 14 ++++-- 5 files changed, 119 insertions(+), 51 deletions(-) diff --git a/src/column.rs b/src/column.rs index 53521cfd..36c1ebb8 100644 --- a/src/column.rs +++ b/src/column.rs @@ -27,7 +27,7 @@ use std::{ }, }; -const MIN_INDEX_BITS: u8 = 16; +pub const MIN_INDEX_BITS: u8 = 16; // Measured in index entries const MAX_REINDEX_BATCH: usize = 8192; diff --git a/src/file.rs b/src/file.rs index 9f68df3d..13304d63 100644 --- a/src/file.rs +++ b/src/file.rs @@ -39,11 +39,7 @@ fn disable_read_ahead(_file: &std::fs::File) -> std::io::Result<()> { #[cfg(unix)] pub fn madvise_random(map: &mut memmap2::MmapMut) { unsafe { - libc::madvise( - map.as_mut_ptr() as _, - map.len(), - libc::MADV_RANDOM, - ); + libc::madvise(map.as_mut_ptr() as _, map.len(), libc::MADV_RANDOM); } } @@ -150,13 +146,13 @@ impl TableFile { let mut map = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); madvise_random(&mut map); *map_and_file = Some((map, file)); - } + }, Some((map, file)) => { try_io!(file.set_len(capacity * entry_size as u64)); - let mut m = try_io!(unsafe { memmap2::MmapMut::map_mut(&*file) }); + let mut m = try_io!(unsafe { memmap2::MmapMut::map_mut(&*file) }); madvise_random(&mut m); *map = m; - } + }, } Ok(()) } diff --git a/src/index.rs b/src/index.rs index 5108141c..378ccf5b 100644 --- a/src/index.rs +++ b/src/index.rs @@ -2,14 +2,14 @@ // This file is dual-licensed as Apache-2.0 or MIT. use crate::{ - column::ColId, + column::{ColId, MIN_INDEX_BITS}, display::hex, error::{try_io, Error, Result}, + file::madvise_random, log::{LogQuery, LogReader, LogWriter}, parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}, stats::{self, ColumnStats}, table::{key::TableKey, SIZE_TIERS_BITS}, - file::madvise_random, Key, }; #[cfg(target_arch = "x86")] @@ -182,6 +182,14 @@ impl TableId { pub fn total_entries(&self) -> u64 { total_entries(self.index_bits()) } + + pub fn log_index(&self) -> usize { + self.col() as usize * (64 - MIN_INDEX_BITS) as usize + self.index_bits() as usize + } + + pub const fn max_log_indicies(num_columns: usize) -> usize { + (64 - MIN_INDEX_BITS) as usize * num_columns + } } impl std::fmt::Display for TableId { diff --git a/src/log.rs b/src/log.rs index 6b057186..69bed177 100644 --- a/src/log.rs +++ b/src/log.rs @@ -1,7 +1,7 @@ // Copyright 2021-2022 Parity Technologies (UK) Ltd. // This file is dual-licensed as Apache-2.0 or MIT. -use parking_lot::{RwLockReadGuard, MappedRwLockReadGuard}; +use parking_lot::{MappedRwLockReadGuard, RwLockReadGuard}; use crate::{ column::ColId, @@ -26,6 +26,9 @@ const INSERT_VALUE: u8 = 3; const END_RECORD: u8 = 4; const DROP_TABLE: u8 = 5; +const MAX_INDEX_OVERLAY_CAPACITY_ITEMS: usize = 1024; +const MAX_VALUE_OVERLAY_CAPACITY_ITEMS: usize = 65536; + #[derive(Debug)] pub struct InsertIndexAction { pub table: IndexTableId, @@ -48,7 +51,9 @@ pub enum LogAction { } pub trait LogQuery { - type ValueRef<'a>: std::ops::Deref where Self: 'a; + type ValueRef<'a>: std::ops::Deref + where + Self: 'a; fn with_index R>( &self, @@ -60,16 +65,28 @@ pub trait LogQuery { fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option>; } -#[derive(Default, Debug)] +#[derive(Debug)] pub struct LogOverlays { - index: HashMap, - value: HashMap, - last_record_id: HashMap, + index: Vec, + value: Vec, + last_record_ids: Vec, } impl LogOverlays { pub fn last_record_id(&self, col: ColId) -> u64 { - self.last_record_id.get(&col).cloned().unwrap_or(u64::MAX) + self.last_record_ids.get(col as usize).cloned().unwrap_or(u64::MAX) + } + + pub fn with_columns(count: usize) -> Self { + Self { + index: (0..IndexTableId::max_log_indicies(count)) + .map(|_| IndexLogOverlay::default()) + .collect(), + value: (0..ValueTableId::max_log_tables(count)) + .map(|_| ValueLogOverlay::default()) + .collect(), + last_record_ids: (0..count).map(|_| 0).collect(), + } } } @@ -104,13 +121,16 @@ impl LogQuery for LogOverlays { f: F, ) -> Option { self.index - .get(&table) + .get(table.log_index()) .and_then(|o| o.map.get(&index).map(|(_id, _mask, data)| f(data))) } fn value(&self, table: ValueTableId, index: u64, dest: &mut [u8]) -> bool { let s = self; - if let Some(d) = s.value.get(&table).and_then(|o| o.map.get(&index).map(|(_id, data)| data)) + if let Some(d) = s + .value + .get(table.log_index()) + .and_then(|o| o.map.get(&index).map(|(_id, data)| data)) { let len = dest.len().min(d.len()); dest[0..len].copy_from_slice(&d[0..len]); @@ -120,8 +140,9 @@ impl LogQuery for LogOverlays { } } fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option> { - log::debug!(target: "parity-db", "Query overlay index {}, record {}, size={}/{}", table, index, self.value.len(), self.value.get(&table).map_or(0, |o| o.map.len())); - self.value.get(&table).and_then(|o| o.map.get(&index).map(|(_id, data)| data.as_slice())) + self.value + .get(table.log_index()) + .and_then(|o| o.map.get(&index).map(|(_id, data)| data.as_slice())) } } @@ -258,7 +279,7 @@ impl<'a> LogReader<'a> { #[derive(Debug)] pub struct LogChange { local_index: HashMap, - local_values: HashMap, + local_values: HashMap, record_id: u64, dropped_tables: Vec, } @@ -273,7 +294,7 @@ impl LogChange { } } - pub fn local_values_changes(&self, id: ValueTableId) -> Option<&ValueLogOverlay> { + pub fn local_values_changes(&self, id: ValueTableId) -> Option<&ValueLogOverlayLocal> { self.local_values.get(&id) } @@ -330,7 +351,7 @@ impl LogChange { #[derive(Debug)] struct FlushedLog { index: HashMap, - values: HashMap, + values: HashMap, bytes: u64, } @@ -378,23 +399,23 @@ impl<'a> LogWriter<'a> { } } -pub enum LogWriterLock<'a> { +pub enum LogWriterValueGuard<'a> { Local(&'a [u8]), Overlay(MappedRwLockReadGuard<'a, [u8]>), } -impl std::ops::Deref for LogWriterLock<'_> { +impl std::ops::Deref for LogWriterValueGuard<'_> { type Target = [u8]; fn deref(&self) -> &[u8] { match self { - LogWriterLock::Local(data) => data, - LogWriterLock::Overlay(data) => data.deref(), + LogWriterValueGuard::Local(data) => data, + LogWriterValueGuard::Overlay(data) => data.deref(), } } } impl<'q> LogQuery for LogWriter<'q> { - type ValueRef<'a> = LogWriterLock<'a> where Self: 'a; + type ValueRef<'a> = LogWriterValueGuard<'a> where Self: 'a; fn with_index R>( &self, table: IndexTableId, @@ -417,7 +438,7 @@ impl<'q> LogQuery for LogWriter<'q> { .log .local_values .get(&table) - .and_then(|o| o.map.get(&index).map(|(_id, data)| data)) + .and_then(|o| o.map.get(&index).map(|(_id, data)| data)) { let len = dest.len().min(d.len()); dest[0..len].copy_from_slice(&d[0..len]); @@ -426,14 +447,18 @@ impl<'q> LogQuery for LogWriter<'q> { self.overlays.value(table, index, dest) } } - fn value_ref<'v> (&'v self, table: ValueTableId, index: u64) -> Option> { - log::debug!(target: "parity-db", "Query local overlay index {}, record {}", table, index); - self - .log + fn value_ref<'v>(&'v self, table: ValueTableId, index: u64) -> Option> { + self.log .local_values .get(&table) - .and_then(|o| o.map.get(&index).map(|(_id, data)| LogWriterLock::Local(data.as_slice()))) - .or_else(|| self.overlays.value_ref(table, index).map(|data| LogWriterLock::Overlay(data))) + .and_then(|o| { + o.map.get(&index).map(|(_id, data)| LogWriterValueGuard::Local(data.as_slice())) + }) + .or_else(|| { + self.overlays + .value_ref(table, index) + .map(|data| LogWriterValueGuard::Overlay(data)) + }) } } @@ -489,6 +514,10 @@ pub struct IndexLogOverlay { // We use identity hash for value overlay/log records so that writes to value tables are in order. #[derive(Debug, Default)] pub struct ValueLogOverlay { + pub map: HashMap)>, // index -> (record_id, entry) +} +#[derive(Debug, Default)] +pub struct ValueLogOverlayLocal { pub map: HashMap), BuildIdHash>, // index -> (record_id, entry) } @@ -552,7 +581,7 @@ impl Log { let next_log_id = if logs.is_empty() { 0 } else { max_log_id + 1 }; Ok(Log { - overlays: Default::default(), + overlays: RwLock::new(LogOverlays::with_columns(options.columns.len())), appending: RwLock::new(None), reading: RwLock::new(None), read_queue: RwLock::default(), @@ -617,9 +646,15 @@ impl Log { self.cleanup_queue.write().push_back((id, file)); } let mut overlays = self.overlays.write(); - overlays.index.clear(); - overlays.value.clear(); - overlays.last_record_id.clear(); + for o in overlays.index.iter_mut() { + o.map.clear(); + } + for o in overlays.value.iter_mut() { + o.map.clear(); + } + for r in overlays.last_record_ids.iter_mut() { + *r = 0; + } self.dirty.store(false, Ordering::Relaxed); } @@ -657,13 +692,13 @@ impl Log { let mut total_index = 0; for (id, overlay) in index.into_iter() { total_index += overlay.map.len(); - overlays.index.entry(id).or_default().map.extend(overlay.map.into_iter()); + overlays.index[id.log_index()].map.extend(overlay.map.into_iter()); } let mut total_value = 0; for (id, overlay) in values.into_iter() { total_value += overlay.map.len(); - overlays.last_record_id.insert(id.col(), record_id); - overlays.value.entry(id).or_default().map.extend(overlay.map.into_iter()); + overlays.last_record_ids[id.col() as usize] = record_id; + overlays.value[id.log_index()].map.extend(overlay.map.into_iter()); } log::debug!( @@ -684,7 +719,7 @@ impl Log { } let mut overlays = self.overlays.write(); for (table, index) in cleared.index.into_iter() { - if let Some(ref mut overlay) = overlays.index.get_mut(&table) { + if let Some(ref mut overlay) = overlays.index.get_mut(table.log_index()) { if let std::collections::hash_map::Entry::Occupied(e) = overlay.map.entry(index) { if e.get().0 == record_id { e.remove_entry(); @@ -693,7 +728,7 @@ impl Log { } } for (table, index) in cleared.values.into_iter() { - if let Some(ref mut overlay) = overlays.value.get_mut(&table) { + if let Some(ref mut overlay) = overlays.value.get_mut(table.log_index()) { if let std::collections::hash_map::Entry::Occupied(e) = overlay.map.entry(index) { if e.get().0 == record_id { e.remove_entry(); @@ -701,8 +736,29 @@ impl Log { } } } - // Cleanup index overlays - overlays.index.retain(|_, overlay| !overlay.map.is_empty()); + // Reclaim overlay memory + for o in overlays.index.iter_mut() { + if o.map.is_empty() && o.map.capacity() > MAX_INDEX_OVERLAY_CAPACITY_ITEMS { + log::info!( + "Schrinking index {}/{} -> {}", + o.map.len(), + o.map.capacity(), + MAX_INDEX_OVERLAY_CAPACITY_ITEMS + ); + o.map.shrink_to_fit(); + } + } + for o in overlays.value.iter_mut() { + if o.map.is_empty() && o.map.capacity() > MAX_VALUE_OVERLAY_CAPACITY_ITEMS { + log::info!( + "Schrinking value {}/{} -> {}", + o.map.len(), + o.map.capacity(), + MAX_VALUE_OVERLAY_CAPACITY_ITEMS + ); + o.map.shrink_to_fit(); + } + } } pub fn flush_one(&self, min_size: u64) -> Result { diff --git a/src/table.rs b/src/table.rs index 73f9c638..fc36fcb5 100644 --- a/src/table.rs +++ b/src/table.rs @@ -48,7 +48,7 @@ use crate::{ column::ColId, display::hex, error::Result, - log::{LogQuery, LogReader, LogWriter}, + log::{LogOverlays, LogQuery, LogReader, LogWriter}, options::ColumnOptions as Options, parking_lot::RwLock, table::key::{TableKey, TableKeyQuery, PARTIAL_SIZE}, @@ -116,6 +116,14 @@ impl TableId { pub fn as_u16(&self) -> u16 { self.0 } + + pub fn log_index(&self) -> usize { + self.col() as usize * SIZE_TIERS + self.size_tier() as usize + } + + pub const fn max_log_tables(num_columns: usize) -> usize { + SIZE_TIERS * num_columns + } } impl std::fmt::Display for TableId { @@ -1055,7 +1063,7 @@ impl ValueTable { fn do_init_with_entry(&self, entry: &[u8]) -> Result<()> { self.file.grow(self.entry_size)?; - let empty_overlays = RwLock::new(Default::default()); + let empty_overlays = RwLock::new(LogOverlays::with_columns(0)); let mut log = LogWriter::new(&empty_overlays, 0); let at = self.overwrite_chain(&TableKey::NoHash, entry, &mut log, None, false)?; self.complete_plan(&mut log)?; @@ -1091,7 +1099,7 @@ impl ValueTable { } pub mod key { - use super::{FullEntry, EntryRef}; + use super::{EntryRef, FullEntry}; use crate::{Key, Result}; pub const PARTIAL_SIZE: usize = 26; From 4523ab92413d2d116b4dcfda919507e033ae021e Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 18 Jul 2023 13:43:03 +0200 Subject: [PATCH 04/17] Tweak reclaim params --- src/index.rs | 6 ++++++ src/log.rs | 30 ++++++++++++++++++------------ src/table.rs | 6 ++++++ 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/src/index.rs b/src/index.rs index 378ccf5b..a022f1a2 100644 --- a/src/index.rs +++ b/src/index.rs @@ -187,6 +187,12 @@ impl TableId { self.col() as usize * (64 - MIN_INDEX_BITS) as usize + self.index_bits() as usize } + pub fn from_log_index(i: usize) -> Self { + let col = i / (64 - MIN_INDEX_BITS) as usize; + let bits = i % (64 - MIN_INDEX_BITS) as usize; + TableId::new(col as ColId, bits as u8) + } + pub const fn max_log_indicies(num_columns: usize) -> usize { (64 - MIN_INDEX_BITS) as usize * num_columns } diff --git a/src/log.rs b/src/log.rs index 69bed177..a8547d85 100644 --- a/src/log.rs +++ b/src/log.rs @@ -26,8 +26,12 @@ const INSERT_VALUE: u8 = 3; const END_RECORD: u8 = 4; const DROP_TABLE: u8 = 5; -const MAX_INDEX_OVERLAY_CAPACITY_ITEMS: usize = 1024; -const MAX_VALUE_OVERLAY_CAPACITY_ITEMS: usize = 65536; +// Once index overly uses less than 1/10 of its capacity, it will be reclaimed. +const INDEX_OVERLAY_RECLAIM_FACTOR: usize = 10; +// Once value overly uses less than 1/10 of its capacity, it will be reclaimed. +const VALUE_OVERLAY_RECLAIM_FACTOR: usize = 10; +// Min number of value items to initiate reclaim. Each item is around 40 bytes. +const VALUE_OVERLAY_MIN_RECLAIM_ITEMS: usize = 10240; #[derive(Debug)] pub struct InsertIndexAction { @@ -737,24 +741,26 @@ impl Log { } } // Reclaim overlay memory - for o in overlays.index.iter_mut() { - if o.map.is_empty() && o.map.capacity() > MAX_INDEX_OVERLAY_CAPACITY_ITEMS { - log::info!( - "Schrinking index {}/{} -> {}", + for (i, o) in overlays.index.iter_mut().enumerate() { + if o.map.capacity() > o.map.len() * INDEX_OVERLAY_RECLAIM_FACTOR { + log::trace!( + "Schrinking index overlay {}: {}/{}", + IndexTableId::from_log_index(i), o.map.len(), o.map.capacity(), - MAX_INDEX_OVERLAY_CAPACITY_ITEMS ); o.map.shrink_to_fit(); } } - for o in overlays.value.iter_mut() { - if o.map.is_empty() && o.map.capacity() > MAX_VALUE_OVERLAY_CAPACITY_ITEMS { - log::info!( - "Schrinking value {}/{} -> {}", + for (i, o) in overlays.value.iter_mut().enumerate() { + if o.map.capacity() > VALUE_OVERLAY_MIN_RECLAIM_ITEMS && + o.map.capacity() > o.map.len() * VALUE_OVERLAY_RECLAIM_FACTOR + { + log::trace!( + "Schrinking value overlay {}: {}/{}", + ValueTableId::from_log_index(i), o.map.len(), o.map.capacity(), - MAX_VALUE_OVERLAY_CAPACITY_ITEMS ); o.map.shrink_to_fit(); } diff --git a/src/table.rs b/src/table.rs index fc36fcb5..c75ebec0 100644 --- a/src/table.rs +++ b/src/table.rs @@ -124,6 +124,12 @@ impl TableId { pub const fn max_log_tables(num_columns: usize) -> usize { SIZE_TIERS * num_columns } + + pub fn from_log_index(i: usize) -> Self { + let col = i / SIZE_TIERS; + let tier = i % SIZE_TIERS; + Self::new(col as ColId, tier as u8) + } } impl std::fmt::Display for TableId { From 82b484c51f9f007c64871c51163247e70603e329 Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 18 Jul 2023 13:43:37 +0200 Subject: [PATCH 05/17] Bump version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 2a07b5e8..c88cc96b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "parity-db" -version = "0.4.9" +version = "0.4.10" authors = ["Parity Technologies "] edition = "2021" license = "MIT OR Apache-2.0" From 7fbd11b784b09839b7cb839f034b1ddba92556bf Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 18 Jul 2023 13:57:27 +0200 Subject: [PATCH 06/17] Style --- src/log.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/log.rs b/src/log.rs index a8547d85..1a9d0ac7 100644 --- a/src/log.rs +++ b/src/log.rs @@ -1,8 +1,6 @@ // Copyright 2021-2022 Parity Technologies (UK) Ltd. // This file is dual-licensed as Apache-2.0 or MIT. -use parking_lot::{MappedRwLockReadGuard, RwLockReadGuard}; - use crate::{ column::ColId, error::{try_io, Error, Result}, @@ -18,6 +16,7 @@ use std::{ io::{ErrorKind, Read, Seek, Write}, sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, }; +use parking_lot::{MappedRwLockReadGuard, RwLockReadGuard}; const MAX_LOG_POOL_SIZE: usize = 16; const BEGIN_RECORD: u8 = 1; From a69d80eb084ef62fa083e76b11ebcebf305a992f Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 18 Jul 2023 14:05:52 +0200 Subject: [PATCH 07/17] Style --- src/log.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/log.rs b/src/log.rs index 1a9d0ac7..c10d02f5 100644 --- a/src/log.rs +++ b/src/log.rs @@ -9,6 +9,7 @@ use crate::{ parking_lot::{RwLock, RwLockWriteGuard}, table::TableId as ValueTableId, }; +use parking_lot::{MappedRwLockReadGuard, RwLockReadGuard}; use std::{ cmp::min, collections::{HashMap, VecDeque}, @@ -16,7 +17,6 @@ use std::{ io::{ErrorKind, Read, Seek, Write}, sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, }; -use parking_lot::{MappedRwLockReadGuard, RwLockReadGuard}; const MAX_LOG_POOL_SIZE: usize = 16; const BEGIN_RECORD: u8 = 1; From 92425794ddf131f65b35f88b976756eb80b12592 Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 18 Jul 2023 21:50:34 +0200 Subject: [PATCH 08/17] Loom workaround --- src/log.rs | 42 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/src/log.rs b/src/log.rs index c10d02f5..97b1f15d 100644 --- a/src/log.rs +++ b/src/log.rs @@ -9,7 +9,6 @@ use crate::{ parking_lot::{RwLock, RwLockWriteGuard}, table::TableId as ValueTableId, }; -use parking_lot::{MappedRwLockReadGuard, RwLockReadGuard}; use std::{ cmp::min, collections::{HashMap, VecDeque}, @@ -93,8 +92,37 @@ impl LogOverlays { } } + +#[cfg(feature = "loom")] +pub struct MappedBytesGuard<'a> { + _phantom: std::marker::PhantomData<&'a ()>, + data: Vec, +} + +#[cfg(feature = "loom")] +impl <'a> MappedBytesGuard<'a> { + fn new(data: Vec) -> Self { + Self { + _phantom: std::marker::PhantomData, + data, + } + } +} + +#[cfg(feature = "loom")] +impl <'a> std::ops::Deref for MappedBytesGuard<'a> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + self.data.as_slice() + } +} + +#[cfg(not(feature = "loom"))] +type MappedBytesGuard<'a> = parking_lot::MappedRwLockReadGuard<'a, [u8]>; + impl LogQuery for RwLock { - type ValueRef<'a> = MappedRwLockReadGuard<'a, [u8]>; + type ValueRef<'a> = MappedBytesGuard<'a>; fn with_index R>( &self, @@ -109,10 +137,16 @@ impl LogQuery for RwLock { (&*self.read()).value(table, index, dest) } + #[cfg(not(feature = "loom"))] fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option> { - let lock = RwLockReadGuard::try_map(self.read(), |o| o.value_ref(table, index)); + let lock = parking_lot::RwLockReadGuard::try_map(self.read(), |o| o.value_ref(table, index)); lock.ok() } + + #[cfg(feature = "loom")] + fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option> { + self.read().value_ref(table, index).map(|o| MappedBytesGuard::new(o.to_vec())) + } } impl LogQuery for LogOverlays { @@ -404,7 +438,7 @@ impl<'a> LogWriter<'a> { pub enum LogWriterValueGuard<'a> { Local(&'a [u8]), - Overlay(MappedRwLockReadGuard<'a, [u8]>), + Overlay(MappedBytesGuard<'a>), } impl std::ops::Deref for LogWriterValueGuard<'_> { From e5b029e629ba439649713a98bcc22fd512cc3e52 Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 18 Jul 2023 21:52:09 +0200 Subject: [PATCH 09/17] Loom workaround --- src/log.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/log.rs b/src/log.rs index 97b1f15d..15eed276 100644 --- a/src/log.rs +++ b/src/log.rs @@ -93,6 +93,7 @@ impl LogOverlays { } +// Loom is missing support for guard projection, so we copy the data as a workaround. #[cfg(feature = "loom")] pub struct MappedBytesGuard<'a> { _phantom: std::marker::PhantomData<&'a ()>, From 543684212f927b9d100e797e475a0519b5257250 Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 18 Jul 2023 21:53:19 +0200 Subject: [PATCH 10/17] fmt --- src/log.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/log.rs b/src/log.rs index 15eed276..882b7ea3 100644 --- a/src/log.rs +++ b/src/log.rs @@ -92,26 +92,22 @@ impl LogOverlays { } } - // Loom is missing support for guard projection, so we copy the data as a workaround. #[cfg(feature = "loom")] -pub struct MappedBytesGuard<'a> { +pub struct MappedBytesGuard<'a> { _phantom: std::marker::PhantomData<&'a ()>, data: Vec, } #[cfg(feature = "loom")] -impl <'a> MappedBytesGuard<'a> { +impl<'a> MappedBytesGuard<'a> { fn new(data: Vec) -> Self { - Self { - _phantom: std::marker::PhantomData, - data, - } + Self { _phantom: std::marker::PhantomData, data } } } #[cfg(feature = "loom")] -impl <'a> std::ops::Deref for MappedBytesGuard<'a> { +impl<'a> std::ops::Deref for MappedBytesGuard<'a> { type Target = [u8]; fn deref(&self) -> &Self::Target { @@ -140,7 +136,8 @@ impl LogQuery for RwLock { #[cfg(not(feature = "loom"))] fn value_ref<'a>(&'a self, table: ValueTableId, index: u64) -> Option> { - let lock = parking_lot::RwLockReadGuard::try_map(self.read(), |o| o.value_ref(table, index)); + let lock = + parking_lot::RwLockReadGuard::try_map(self.read(), |o| o.value_ref(table, index)); lock.ok() } From b6e3ac9b5841e2a069dc6351a8a040c25275cedd Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 18 Jul 2023 21:59:03 +0200 Subject: [PATCH 11/17] Destroy old map first --- src/file.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/file.rs b/src/file.rs index 13304d63..185f7ac0 100644 --- a/src/file.rs +++ b/src/file.rs @@ -139,7 +139,7 @@ impl TableFile { self.capacity.store(capacity, Ordering::Relaxed); let mut map_and_file = self.map.write(); - match map_and_file.as_mut() { + match map_and_file.take() { None => { let file = self.create_file()?; try_io!(file.set_len(capacity * entry_size as u64)); @@ -147,11 +147,11 @@ impl TableFile { madvise_random(&mut map); *map_and_file = Some((map, file)); }, - Some((map, file)) => { + Some((_map, file)) => { try_io!(file.set_len(capacity * entry_size as u64)); - let mut m = try_io!(unsafe { memmap2::MmapMut::map_mut(&*file) }); + let mut m = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); madvise_random(&mut m); - *map = m; + *map_and_file = Some((m, file)); }, } Ok(()) From ebc75c5c21b379b18e76de0a806ba8e9f8645aaf Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 18 Jul 2023 22:15:15 +0200 Subject: [PATCH 12/17] Really destroy old map first --- src/file.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/file.rs b/src/file.rs index 185f7ac0..bc373efa 100644 --- a/src/file.rs +++ b/src/file.rs @@ -147,7 +147,8 @@ impl TableFile { madvise_random(&mut map); *map_and_file = Some((map, file)); }, - Some((_map, file)) => { + Some((map, file)) => { + std::mem::drop(map); try_io!(file.set_len(capacity * entry_size as u64)); let mut m = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); madvise_random(&mut m); From e40f6576d344b4eb0b2b4f4acfe6116448d4e127 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 19 Jul 2023 14:48:57 +0200 Subject: [PATCH 13/17] Reserve address space for the file mapping --- src/file.rs | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/src/file.rs b/src/file.rs index bc373efa..b919ee74 100644 --- a/src/file.rs +++ b/src/file.rs @@ -10,6 +10,8 @@ use crate::{ }; use std::sync::atomic::{AtomicU64, Ordering}; +const RESERVE_ADDRESS_SPACE: usize = 1024 * 1024 * 1024; + #[cfg(target_os = "linux")] fn disable_read_ahead(file: &std::fs::File) -> std::io::Result<()> { use std::os::unix::io::AsRawFd; @@ -56,6 +58,10 @@ pub struct TableFile { pub id: TableId, } +fn map_len(file_len: u64) -> usize { + file_len as usize + RESERVE_ADDRESS_SPACE +} + impl TableFile { pub fn open(filepath: std::path::PathBuf, entry_size: u16, id: TableId) -> Result { let mut capacity = 0u64; @@ -73,7 +79,8 @@ impl TableFile { } else { capacity = len / entry_size as u64; } - let mut map = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); + let mut map = + try_io!(unsafe { memmap2::MmapOptions::new().len(map_len(len)).map_mut(&file) }); madvise_random(&mut map); Some((map, file)) } else { @@ -139,20 +146,29 @@ impl TableFile { self.capacity.store(capacity, Ordering::Relaxed); let mut map_and_file = self.map.write(); - match map_and_file.take() { + match map_and_file.as_mut() { None => { let file = self.create_file()?; try_io!(file.set_len(capacity * entry_size as u64)); - let mut map = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); + let mut map = try_io!(unsafe { + memmap2::MmapOptions::new().len(RESERVE_ADDRESS_SPACE).map_mut(&file) + }); madvise_random(&mut map); *map_and_file = Some((map, file)); }, Some((map, file)) => { - std::mem::drop(map); - try_io!(file.set_len(capacity * entry_size as u64)); - let mut m = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) }); - madvise_random(&mut m); - *map_and_file = Some((m, file)); + let new_len = capacity * entry_size as u64; + try_io!(file.set_len(new_len)); + if map.len() < new_len as usize { + let mut new_map = try_io!(unsafe { + memmap2::MmapOptions::new().len(map_len(new_len)).map_mut(&*file) + }); + madvise_random(&mut new_map); + let old_map = std::mem::replace(map, new_map); + try_io!(old_map.flush()); + // Leak the old mapping as there might be concurrent readers. + std::mem::forget(old_map); + } }, } Ok(()) From baa49ac1c66586602bcb3e344715a1e0bf7e9691 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 19 Jul 2023 21:20:40 +0200 Subject: [PATCH 14/17] aarch64 CI test --- src/file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/file.rs b/src/file.rs index b919ee74..3b4e1db8 100644 --- a/src/file.rs +++ b/src/file.rs @@ -10,7 +10,7 @@ use crate::{ }; use std::sync::atomic::{AtomicU64, Ordering}; -const RESERVE_ADDRESS_SPACE: usize = 1024 * 1024 * 1024; +const RESERVE_ADDRESS_SPACE: usize = 64 * 1024 * 1024; #[cfg(target_os = "linux")] fn disable_read_ahead(file: &std::fs::File) -> std::io::Result<()> { From 7467d3ec8dec4814cac38c1090ba9c668281eda0 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 19 Jul 2023 21:36:01 +0200 Subject: [PATCH 15/17] aarch64 CI test --- src/file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/file.rs b/src/file.rs index 3b4e1db8..644ba751 100644 --- a/src/file.rs +++ b/src/file.rs @@ -10,7 +10,7 @@ use crate::{ }; use std::sync::atomic::{AtomicU64, Ordering}; -const RESERVE_ADDRESS_SPACE: usize = 64 * 1024 * 1024; +const RESERVE_ADDRESS_SPACE: usize = 512 * 1024 * 1024; #[cfg(target_os = "linux")] fn disable_read_ahead(file: &std::fs::File) -> std::io::Result<()> { From bc2b7d215429829b4e099a7073e4ee03b5b75ec6 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 19 Jul 2023 22:20:22 +0200 Subject: [PATCH 16/17] Align file size --- src/file.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/file.rs b/src/file.rs index 644ba751..b61e27c7 100644 --- a/src/file.rs +++ b/src/file.rs @@ -75,7 +75,7 @@ impl TableFile { if len == 0 { // Preallocate. capacity += GROW_SIZE_BYTES / entry_size as u64; - try_io!(file.set_len(capacity * entry_size as u64)); + try_io!(file.set_len(GROW_SIZE_BYTES)); } else { capacity = len / entry_size as u64; } @@ -141,23 +141,21 @@ impl TableFile { } pub fn grow(&self, entry_size: u16) -> Result<()> { - let mut capacity = self.capacity.load(Ordering::Relaxed); - capacity += GROW_SIZE_BYTES / entry_size as u64; - - self.capacity.store(capacity, Ordering::Relaxed); let mut map_and_file = self.map.write(); - match map_and_file.as_mut() { + let new_len = match map_and_file.as_mut() { None => { let file = self.create_file()?; - try_io!(file.set_len(capacity * entry_size as u64)); + let len = GROW_SIZE_BYTES; + try_io!(file.set_len(len)); let mut map = try_io!(unsafe { memmap2::MmapOptions::new().len(RESERVE_ADDRESS_SPACE).map_mut(&file) }); madvise_random(&mut map); *map_and_file = Some((map, file)); + len }, Some((map, file)) => { - let new_len = capacity * entry_size as u64; + let new_len = try_io!(file.metadata()).len() + GROW_SIZE_BYTES; try_io!(file.set_len(new_len)); if map.len() < new_len as usize { let mut new_map = try_io!(unsafe { @@ -169,8 +167,11 @@ impl TableFile { // Leak the old mapping as there might be concurrent readers. std::mem::forget(old_map); } + new_len }, - } + }; + let capacity = new_len / entry_size as u64; + self.capacity.store(capacity, Ordering::Relaxed); Ok(()) } From 878a2d50b01bc80fe3f73710fe271b1cab285470 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 19 Jul 2023 23:17:49 +0200 Subject: [PATCH 17/17] aarch64 CI test fix --- src/file.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/file.rs b/src/file.rs index b61e27c7..34c142fa 100644 --- a/src/file.rs +++ b/src/file.rs @@ -10,7 +10,12 @@ use crate::{ }; use std::sync::atomic::{AtomicU64, Ordering}; -const RESERVE_ADDRESS_SPACE: usize = 512 * 1024 * 1024; +#[cfg(not(test))] +const RESERVE_ADDRESS_SPACE: usize = 1024 * 1024 * 1024; // 1 Gb + +// Use different value for tests to work around docker limits on the test machine. +#[cfg(test)] +const RESERVE_ADDRESS_SPACE: usize = 64 * 1024 * 1024; // 64 Mb #[cfg(target_os = "linux")] fn disable_read_ahead(file: &std::fs::File) -> std::io::Result<()> {