Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use mmap IO for value tables #214

Merged
merged 18 commits into from
Jul 21, 2023
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "parity-db"
version = "0.4.9"
version = "0.4.10"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2021"
license = "MIT OR Apache-2.0"
Expand Down
4 changes: 2 additions & 2 deletions admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ pub fn run() -> Result<(), String> {

let mut db_options = options.clone();
if args.compress {
for mut c in &mut db_options.columns {
for c in &mut db_options.columns {
c.compression = parity_db::CompressionType::Lz4;
}
}
if args.uniform {
for mut c in &mut db_options.columns {
for c in &mut db_options.columns {
c.uniform = true;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::{
},
};

const MIN_INDEX_BITS: u8 = 16;
pub const MIN_INDEX_BITS: u8 = 16;
// Measured in index entries
const MAX_REINDEX_BATCH: usize = 8192;

Expand Down
198 changes: 88 additions & 110 deletions src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@
//! Utilities for db file.

use crate::{
error::{try_io, Error, Result},
parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard},
error::{try_io, Result},
parking_lot::RwLock,
table::TableId,
};
use std::sync::atomic::{AtomicU64, Ordering};

#[cfg(not(test))]
const RESERVE_ADDRESS_SPACE: usize = 1024 * 1024 * 1024; // 1 Gb

// Use different value for tests to work around docker limits on the test machine.
#[cfg(test)]
const RESERVE_ADDRESS_SPACE: usize = 64 * 1024 * 1024; // 64 Mb
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if this could be configurable (probably these kind of limitation might exist on some cloud infra).
Maybe even a bit more granular (eg more reserved for log).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll give it a round of testing on the parity infrastructure. I've tested on my arm64 vm and had no issues.


#[cfg(target_os = "linux")]
fn disable_read_ahead(file: &std::fs::File) -> std::io::Result<()> {
use std::os::unix::io::AsRawFd;
Expand All @@ -36,42 +43,34 @@ fn disable_read_ahead(_file: &std::fs::File) -> std::io::Result<()> {
Ok(())
}

// `File::sync_data` uses F_FULLSYNC fcntl on MacOS. It it supposed to be
// the safest way to make sure data is fully persisted. However starting from
// MacOS 11.0 it severely degrades parallel write performance, even when writing to
// other files. Regular `fsync` is good enough for our use case.
// SSDs used in modern macs seem to be able to flush data even on unexpected power loss.
// We performed some testing with power shutdowns and kernel panics on both mac hardware
// and VMs and in all cases `fsync` was enough to prevent data corruption.
#[cfg(target_os = "macos")]
fn fsync(file: &std::fs::File) -> std::io::Result<()> {
use std::os::unix::io::AsRawFd;
if unsafe { libc::fsync(file.as_raw_fd()) } != 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(())
#[cfg(unix)]
pub fn madvise_random(map: &mut memmap2::MmapMut) {
unsafe {
libc::madvise(map.as_mut_ptr() as _, map.len(), libc::MADV_RANDOM);
}
}

#[cfg(not(target_os = "macos"))]
fn fsync(file: &std::fs::File) -> std::io::Result<()> {
file.sync_data()
}
#[cfg(not(unix))]
pub fn madvise_random(_id: TableId, _map: &mut memmap2::MmapMut) {}

const GROW_SIZE_BYTES: u64 = 256 * 1024;

#[derive(Debug)]
pub struct TableFile {
pub file: RwLock<Option<std::fs::File>>,
pub map: RwLock<Option<(memmap2::MmapMut, std::fs::File)>>,
pub path: std::path::PathBuf,
pub capacity: AtomicU64,
pub id: TableId,
}

fn map_len(file_len: u64) -> usize {
file_len as usize + RESERVE_ADDRESS_SPACE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if we also should try to align the size?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm made is so that file size is always aligned.

}

impl TableFile {
pub fn open(filepath: std::path::PathBuf, entry_size: u16, id: TableId) -> Result<Self> {
let mut capacity = 0u64;
let file = if std::fs::metadata(&filepath).is_ok() {
let map = if std::fs::metadata(&filepath).is_ok() {
let file = try_io!(std::fs::OpenOptions::new()
.read(true)
.write(true)
Expand All @@ -81,17 +80,20 @@ impl TableFile {
if len == 0 {
// Preallocate.
capacity += GROW_SIZE_BYTES / entry_size as u64;
try_io!(file.set_len(capacity * entry_size as u64));
try_io!(file.set_len(GROW_SIZE_BYTES));
} else {
capacity = len / entry_size as u64;
}
Some(file)
let mut map =
try_io!(unsafe { memmap2::MmapOptions::new().len(map_len(len)).map_mut(&file) });
madvise_random(&mut map);
Some((map, file))
} else {
None
};
Ok(TableFile {
path: filepath,
file: RwLock::new(file),
map: RwLock::new(map),
capacity: AtomicU64::new(capacity),
id,
})
Expand All @@ -108,111 +110,87 @@ impl TableFile {
Ok(file)
}

#[cfg(unix)]
pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<()> {
use std::os::unix::fs::FileExt;
try_io!(self
.file
.read()
.as_ref()
.ok_or_else(|| Error::Corruption("File does not exist.".into()))?
.read_exact_at(buf, offset));
Ok(())
}

#[cfg(unix)]
pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<()> {
use std::os::unix::fs::FileExt;
try_io!(self.file.read().as_ref().unwrap().write_all_at(buf, offset));
let offset = offset as usize;
let map = self.map.read();
let (map, _) = map.as_ref().unwrap();
buf.copy_from_slice(&map[offset..offset + buf.len()]);
Ok(())
}

#[cfg(windows)]
pub fn read_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<()> {
use crate::error::Error;
use std::{io, os::windows::fs::FileExt};

let file = self.file.read();
let file = file.as_ref().ok_or_else(|| Error::Corruption("File does not exist.".into()))?;

while !buf.is_empty() {
match file.seek_read(buf, offset) {
Ok(0) => break,
Ok(n) => {
buf = &mut buf[n..];
offset += n as u64;
},
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
// Try again
},
Err(e) => return Err(Error::Io(e)),
}
}

if !buf.is_empty() {
Err(Error::Io(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
)))
} else {
Ok(())
}
pub fn slice_at(&self, offset: u64, len: usize) -> &[u8] {
let offset = offset as usize;
let map = self.map.read();
let (map, _) = map.as_ref().unwrap();
let data: &[u8] = unsafe {
let ptr = map.as_ptr().add(offset);
std::slice::from_raw_parts(ptr, len)
};
data
}

#[cfg(windows)]
pub fn write_at(&self, mut buf: &[u8], mut offset: u64) -> Result<()> {
use crate::error::Error;
use std::{io, os::windows::fs::FileExt};

let file = self.file.read();
let file = file.as_ref().unwrap();

while !buf.is_empty() {
match file.seek_write(buf, offset) {
Ok(0) =>
return Err(Error::Io(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write whole buffer",
))),
Ok(n) => {
buf = &buf[n..];
offset += n as u64;
},
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {
// Try again
},
Err(e) => return Err(Error::Io(e)),
}
}

pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<()> {
let map = self.map.read();
let (map, _) = map.as_ref().unwrap();
let offset = offset as usize;

// Nasty mutable pointer cast. We do ensure that all chunks that are being written are
// accessed through the overlay in other threads.
let ptr: *mut u8 = map.as_ptr() as *mut u8;
let data: &mut [u8] = unsafe {
let ptr = ptr.add(offset);
std::slice::from_raw_parts_mut(ptr, buf.len())
};
data.copy_from_slice(buf);
Ok(())
}

pub fn grow(&self, entry_size: u16) -> Result<()> {
let mut capacity = self.capacity.load(Ordering::Relaxed);
capacity += GROW_SIZE_BYTES / entry_size as u64;

let mut map_and_file = self.map.write();
let new_len = match map_and_file.as_mut() {
None => {
let file = self.create_file()?;
let len = GROW_SIZE_BYTES;
try_io!(file.set_len(len));
let mut map = try_io!(unsafe {
memmap2::MmapOptions::new().len(RESERVE_ADDRESS_SPACE).map_mut(&file)
});
madvise_random(&mut map);
*map_and_file = Some((map, file));
len
},
Some((map, file)) => {
let new_len = try_io!(file.metadata()).len() + GROW_SIZE_BYTES;
try_io!(file.set_len(new_len));
if map.len() < new_len as usize {
let mut new_map = try_io!(unsafe {
memmap2::MmapOptions::new().len(map_len(new_len)).map_mut(&*file)
});
madvise_random(&mut new_map);
let old_map = std::mem::replace(map, new_map);
try_io!(old_map.flush());
// Leak the old mapping as there might be concurrent readers.
std::mem::forget(old_map);
Comment on lines +172 to +173
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this leak not an issue? I'm trying to identify the root cause of #233 and this looks like a candidate.

}
new_len
},
};
let capacity = new_len / entry_size as u64;
self.capacity.store(capacity, Ordering::Relaxed);
let mut file = self.file.upgradable_read();
if file.is_none() {
let mut wfile = RwLockUpgradableReadGuard::upgrade(file);
*wfile = Some(self.create_file()?);
file = RwLockWriteGuard::downgrade_to_upgradable(wfile);
}
try_io!(file.as_ref().unwrap().set_len(capacity * entry_size as u64));
Ok(())
}

pub fn flush(&self) -> Result<()> {
if let Some(file) = self.file.read().as_ref() {
cheme marked this conversation as resolved.
Show resolved Hide resolved
try_io!(fsync(file));
if let Some((map, _)) = self.map.read().as_ref() {
try_io!(map.flush());
}
Ok(())
}

pub fn remove(&self) -> Result<()> {
let mut file = self.file.write();
if let Some(file) = file.take() {
let mut map = self.map.write();
if let Some((map, file)) = map.take() {
drop(map);
drop(file);
try_io!(std::fs::remove_file(&self.path));
}
Expand Down
36 changes: 18 additions & 18 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
// This file is dual-licensed as Apache-2.0 or MIT.

use crate::{
column::ColId,
column::{ColId, MIN_INDEX_BITS},
display::hex,
error::{try_io, Error, Result},
file::madvise_random,
log::{LogQuery, LogReader, LogWriter},
parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard},
stats::{self, ColumnStats},
Expand Down Expand Up @@ -142,20 +143,6 @@ fn file_size(index_bits: u8) -> u64 {
total_entries(index_bits) * 8 + META_SIZE as u64
}

#[cfg(unix)]
fn madvise_random(id: TableId, map: &mut memmap2::MmapMut) {
unsafe {
libc::madvise(
map.as_mut_ptr() as _,
file_size(id.index_bits()) as usize,
libc::MADV_RANDOM,
);
}
}

#[cfg(not(unix))]
fn madvise_random(_id: TableId, _map: &mut memmap2::MmapMut) {}

#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub struct TableId(u16);

Expand Down Expand Up @@ -195,6 +182,20 @@ impl TableId {
pub fn total_entries(&self) -> u64 {
total_entries(self.index_bits())
}

pub fn log_index(&self) -> usize {
self.col() as usize * (64 - MIN_INDEX_BITS) as usize + self.index_bits() as usize
}

pub fn from_log_index(i: usize) -> Self {
let col = i / (64 - MIN_INDEX_BITS) as usize;
let bits = i % (64 - MIN_INDEX_BITS) as usize;
TableId::new(col as ColId, bits as u8)
}

pub const fn max_log_indicies(num_columns: usize) -> usize {
(64 - MIN_INDEX_BITS) as usize * num_columns
}
}

impl std::fmt::Display for TableId {
Expand All @@ -216,7 +217,7 @@ impl IndexTable {

try_io!(file.set_len(file_size(id.index_bits())));
let mut map = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) });
madvise_random(id, &mut map);
madvise_random(&mut map);
log::debug!(target: "parity-db", "Opened existing index {}", id);
Ok(Some(IndexTable { id, path, map: RwLock::new(Some(map)) }))
}
Expand Down Expand Up @@ -564,7 +565,7 @@ impl IndexTable {
log::debug!(target: "parity-db", "Created new index {}", self.id);
try_io!(file.set_len(file_size(self.id.index_bits())));
let mut mmap = try_io!(unsafe { memmap2::MmapMut::map_mut(&file) });
madvise_random(self.id, &mut mmap);
madvise_random(&mut mmap);
*wmap = Some(mmap);
map = RwLockWriteGuard::downgrade_to_upgradable(wmap);
}
Expand Down Expand Up @@ -641,7 +642,6 @@ mod test {
use super::*;
use rand::{Rng, SeedableRng};
use std::path::PathBuf;

#[cfg(feature = "bench")]
use test::Bencher;
#[cfg(feature = "bench")]
Expand Down
Loading
Loading