Skip to content

Commit

Permalink
Use mmap IO for value tables (#214)
Browse files Browse the repository at this point in the history
* Experimental mmap IO for tables

* Eliminate extra copy on read

* Reclaim overlay mem

* Tweak reclaim params

* Bump version

* Style

* Style

* Loom workaround

* Loom workaround

* fmt

* Destroy old map first

* Really destroy old map first

* Reserve address space for the file mapping

* aarch64 CI test

* aarch64 CI test

* Align file size

* aarch64 CI test fix
  • Loading branch information
arkpar committed Jul 21, 2023
1 parent edee3d1 commit 3034715
Show file tree
Hide file tree
Showing 7 changed files with 356 additions and 222 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "parity-db"
version = "0.4.9"
version = "0.4.10"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2021"
license = "MIT OR Apache-2.0"
Expand Down
4 changes: 2 additions & 2 deletions admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ pub fn run() -> Result<(), String> {

let mut db_options = options.clone();
if args.compress {
for mut c in &mut db_options.columns {
for c in &mut db_options.columns {
c.compression = parity_db::CompressionType::Lz4;
}
}
if args.uniform {
for mut c in &mut db_options.columns {
for c in &mut db_options.columns {
c.uniform = true;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::{
},
};

const MIN_INDEX_BITS: u8 = 16;
pub const MIN_INDEX_BITS: u8 = 16;
// Measured in index entries
const MAX_REINDEX_BATCH: usize = 8192;

Expand Down
198 changes: 88 additions & 110 deletions src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@
//! Utilities for db file.

use crate::{
error::{try_io, Error, Result},
parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard},
error::{try_io, Result},
parking_lot::RwLock,
table::TableId,
};
use std::sync::atomic::{AtomicU64, Ordering};

#[cfg(not(test))]
const RESERVE_ADDRESS_SPACE: usize = 1024 * 1024 * 1024; // 1 Gb

// Use different value for tests to work around docker limits on the test machine.
#[cfg(test)]
const RESERVE_ADDRESS_SPACE: usize = 64 * 1024 * 1024; // 64 Mb

#[cfg(target_os = "linux")]
fn disable_read_ahead(file: &std::fs::File) -> std::io::Result<()> {
use std::os::unix::io::AsRawFd;
Expand All @@ -36,42 +43,34 @@ fn disable_read_ahead(_file: &std::fs::File) -> std::io::Result<()> {
Ok(())
}

// `File::sync_data` uses F_FULLSYNC fcntl on MacOS. It it supposed to be
// the safest way to make sure data is fully persisted. However starting from
// MacOS 11.0 it severely degrades parallel write performance, even when writing to
// other files. Regular `fsync` is good enough for our use case.
// SSDs used in modern macs seem to be able to flush data even on unexpected power loss.
// We performed some testing with power shutdowns and kernel panics on both mac hardware
// and VMs and in all cases `fsync` was enough to prevent data corruption.
#[cfg(target_os = "macos")]
fn fsync(file: &std::fs::File) -> std::io::Result<()> {
use std::os::unix::io::AsRawFd;
if unsafe { libc::fsync(file.as_raw_fd()) } != 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(())
#[cfg(unix)]
pub fn madvise_random(map: &mut memmap2::MmapMut) {
unsafe {
libc::madvise(map.as_mut_ptr() as _, map.len(), libc::MADV_RANDOM);
}
}

#[cfg(not(target_os = "macos"))]
fn fsync(file: &std::fs::File) -> std::io::Result<()> {
file.sync_data()
}
#[cfg(not(unix))]
pub fn madvise_random(_id: TableId, _map: &mut memmap2::MmapMut) {}

const GROW_SIZE_BYTES: u64 = 256 * 1024;

#[derive(Debug)]
pub struct TableFile {
pub file: RwLock<Option<std::fs::File>>,
pub map: RwLock<Option<(memmap2::MmapMut, std::fs::File)>>,
pub path: std::path::PathBuf,
pub capacity: AtomicU64,
pub id: TableId,
}

fn map_len(file_len: u64) -> usize {
file_len as usize + RESERVE_ADDRESS_SPACE
}

impl TableFile {
pub fn open(filepath: std::path::PathBuf, entry_size: u16, id: TableId) -> Result<Self> {
let mut capacity = 0u64;
let file = if std::fs::metadata(&filepath).is_ok() {
let map = if std::fs::metadata(&filepath).is_ok() {
let file = try_io!(std::fs::OpenOptions::new()
.read(true)
.write(true)
Expand All @@ -81,17 +80,20 @@ impl TableFile {
if len == 0 {
// Preallocate.
capacity += GROW_SIZE_BYTES / entry_size as u64;
try_io!(file.set_len(capacity * entry_size as u64));
try_io!(file.set_len(GROW_SIZE_BYTES));
} else {
capacity = len / entry_size as u64;
}
Some(file)
let mut map =
try_io!(unsafe { memmap2::MmapOptions::new().len(map_len(len)).map_mut(&file) });
madvise_random(&mut map);
Some((map, file))
} else {
None
};
Ok(TableFile {
path: filepath,
file: RwLock::new(file),
map: RwLock::new(map),
capacity: AtomicU64::new(capacity),
id,
})
Expand All @@ -108,111 +110,87 @@ impl TableFile {
Ok(file)
}

#[cfg(unix)]
pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<()> {
use std::os::unix::fs::FileExt;
try_io!(self
.file
.read()
.as_ref()
.ok_or_else(|| Error::Corruption("File does not exist.".into()))?
.read_exact_at(buf, offset));
Ok(())
}

#[cfg(unix)]
pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<()> {
use std::os::unix::fs::FileExt;
try_io!(self.file.read().as_ref().unwrap().write_all_at(buf, offset));
let offset = offset as usize;
let map = self.map.read();
let (map, _) = map.as_ref().unwrap();
buf.copy_from_slice(&map[offset..offset + buf.len()]);
Ok(())
}

#[cfg(windows)]
pub fn read_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<()> {
use crate::error::Error;
use std::{io, os::windows::fs::FileExt};

let file = self.file.read();
let file = file.as_ref().ok_or_else(|| Error::Corruption("File does not exist.".into()))?;

while !buf.is_empty() {
match file.seek_read(buf, offset) {
Ok(0) => break,
Ok(n) => {
buf = &mut buf[n..];
offset += n as u64;
},
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
// Try again
},
Err(e) => return Err(Error::Io(e)),
}
}

if !buf.is_empty() {
Err(Error::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
)))
} else {
Ok(())
}
pub fn slice_at(&self, offset: u64, len: usize) -> &[u8] {
let offset = offset as usize;
let map = self.map.read();
let (map, _) = map.as_ref().unwrap();
let data: &[u8] = unsafe {
let ptr = map.as_ptr().add(offset);
std::slice::from_raw_parts(ptr, len)
};
data
}

#[cfg(windows)]
pub fn write_at(&self, mut buf: &[u8], mut offset: u64) -> Result<()> {
use crate::error::Error;
use std::{io, os::windows::fs::FileExt};

let file = self.file.read();
let file = file.as_ref().unwrap();

while !buf.is_empty() {
match file.seek_write(buf, offset) {
Ok(0) =>
return Err(Error::Io(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write whole buffer",
))),
Ok(n) => {
buf = &buf[n..];
offset += n as u64;
},
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
// Try again
},
Err(e) => return Err(Error::Io(e)),
}
}

pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<()> {
let map = self.map.read();
let (map, _) = map.as_ref().unwrap();
let offset = offset as usize;

// Nasty mutable pointer cast. We do ensure that all chunks that are being written are
// accessed through the overlay in other threads.
let ptr: *mut u8 = map.as_ptr() as *mut u8;
let data: &mut [u8] = unsafe {
let ptr = ptr.add(offset);
std::slice::from_raw_parts_mut(ptr, buf.len())
};
data.copy_from_slice(buf);
Ok(())
}

pub fn grow(&self, entry_size: u16) -> Result<()> {
let mut capacity = self.capacity.load(Ordering::Relaxed);
capacity += GROW_SIZE_BYTES / entry_size as u64;

let mut map_and_file = self.map.write();
let new_len = match map_and_file.as_mut() {
None => {
let file = self.create_file()?;
let len = GROW_SIZE_BYTES;
try_io!(file.set_len(len));
let mut map = try_io!(unsafe {
memmap2::MmapOptions::new().len(RESERVE_ADDRESS_SPACE).map_mut(&file)
});
madvise_random(&mut map);
*map_and_file = Some((map, file));
len
},
Some((map, file)) => {
let new_len = try_io!(file.metadata()).len() + GROW_SIZE_BYTES;
try_io!(file.set_len(new_len));
if map.len() < new_len as usize {
let mut new_map = try_io!(unsafe {
memmap2::MmapOptions::new().len(map_len(new_len)).map_mut(&*file)
});
madvise_random(&mut new_map);
let old_map = std::mem::replace(map, new_map);
try_io!(old_map.flush());
// Leak the old mapping as there might be concurrent readers.
std::mem::forget(old_map);
}
new_len
},
};
let capacity = new_len / entry_size as u64;
self.capacity.store(capacity, Ordering::Relaxed);
let mut file = self.file.upgradable_read();
if file.is_none() {
let mut wfile = RwLockUpgradableReadGuard::upgrade(file);
*wfile = Some(self.create_file()?);
file = RwLockWriteGuard::downgrade_to_upgradable(wfile);
}
try_io!(file.as_ref().unwrap().set_len(capacity * entry_size as u64));
Ok(())
}

pub fn flush(&self) -> Result<()> {
if let Some(file) = self.file.read().as_ref() {
try_io!(fsync(file));
if let Some((map, _)) = self.map.read().as_ref() {
try_io!(map.flush());
}
Ok(())
}

pub fn remove(&self) -> Result<()> {
let mut file = self.file.write();
if let Some(file) = file.take() {
let mut map = self.map.write();
if let Some((map, file)) = map.take() {
drop(map);
drop(file);
try_io!(std::fs::remove_file(&self.path));
}
Expand Down
36 changes: 18 additions & 18 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
// This file is dual-licensed as Apache-2.0 or MIT.

use crate::{
column::ColId,
column::{ColId, MIN_INDEX_BITS},
display::hex,
error::{try_io, Error, Result},
file::madvise_random,
log::{LogQuery, LogReader, LogWriter},
parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard},
stats::{self, ColumnStats},
Expand Down Expand Up @@ -142,20 +143,6 @@ fn file_size(index_bits: u8) -> u64 {
total_entries(index_bits) * 8 + META_SIZE as u64
}

#[cfg(unix)]
fn madvise_random(id: TableId, map: &mut memmap2::MmapMut) {
unsafe {
libc::madvise(
map.as_mut_ptr() as _,
file_size(id.index_bits()) as usize,
libc::MADV_RANDOM,
);
}
}

#[cfg(not(unix))]
fn madvise_random(_id: TableId, _map: &mut memmap2::MmapMut) {}

#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub struct TableId(u16);

Expand Down Expand Up @@ -195,6 +182,20 @@ impl TableId {
pub fn total_entries(&self) -> u64 {
total_entries(self.index_bits())
}

pub fn log_index(&self) -> usize {
self.col() as usize * (64 - MIN_INDEX_BITS) as usize + self.index_bits() as usize
}

pub fn from_log_index(i: usize) -> Self {
let col = i / (64 - MIN_INDEX_BITS) as usize;
let bits = i % (64 - MIN_INDEX_BITS) as usize;
TableId::new(col as ColId, bits as u8)
}

pub const fn max_log_indicies(num_columns: usize) -> usize {
(64 - MIN_INDEX_BITS) as usize * num_columns
}
}

impl std::fmt::Display for TableId {
Expand All @@ -216,7 +217,7 @@ impl IndexTable {

try_io!(file.set_len(file_size(id.index_bits())));
let mut map = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) });
madvise_random(id, &mut map);
madvise_random(&mut map);
log::debug!(target: "parity-db", "Opened existing index {}", id);
Ok(Some(IndexTable { id, path, map: RwLock::new(Some(map)) }))
}
Expand Down Expand Up @@ -564,7 +565,7 @@ impl IndexTable {
log::debug!(target: "parity-db", "Created new index {}", self.id);
try_io!(file.set_len(file_size(self.id.index_bits())));
let mut mmap = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) });
madvise_random(self.id, &mut mmap);
madvise_random(&mut mmap);
*wmap = Some(mmap);
map = RwLockWriteGuard::downgrade_to_upgradable(wmap);
}
Expand Down Expand Up @@ -641,7 +642,6 @@ mod test {
use super::*;
use rand::{Rng, SeedableRng};
use std::path::PathBuf;

#[cfg(feature = "bench")]
use test::Bencher;
#[cfg(feature = "bench")]
Expand Down
Loading

0 comments on commit 3034715

Please sign in to comment.