-
Notifications
You must be signed in to change notification settings - Fork 61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use mmap IO for value tables #214
Merged
Merged
Changes from 14 commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
cbe07fa
Experimental mmap IO for tables
arkpar fe713a6
Merge branch 'master' of github.com:paritytech/parity-db into arkpar/…
arkpar 66af345
Eliminate extra copy on read
arkpar 80eac43
Reclaim overlay mem
arkpar 4523ab9
Tweak reclaim params
arkpar 82b484c
Bump version
arkpar 7fbd11b
Style
arkpar a69d80e
Style
arkpar 9242579
Loom workaround
arkpar e5b029e
Loom workaround
arkpar 5436842
fmt
arkpar b6e3ac9
Destroy old map first
arkpar ebc75c5
Really destroy old map first
arkpar e40f657
Reserve address space for the file mapping
arkpar baa49ac
aarch64 CI test
arkpar 7467d3e
aarch64 CI test
arkpar bc2b7d2
Align file size
arkpar 878a2d5
aarch64 CI test fix
arkpar File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,12 +4,14 @@ | |
//! Utilities for db file. | ||
|
||
use crate::{ | ||
error::{try_io, Error, Result}, | ||
parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard}, | ||
error::{try_io, Result}, | ||
parking_lot::RwLock, | ||
table::TableId, | ||
}; | ||
use std::sync::atomic::{AtomicU64, Ordering}; | ||
|
||
const RESERVE_ADDRESS_SPACE: usize = 1024 * 1024 * 1024; | ||
|
||
#[cfg(target_os = "linux")] | ||
fn disable_read_ahead(file: &std::fs::File) -> std::io::Result<()> { | ||
use std::os::unix::io::AsRawFd; | ||
|
@@ -36,42 +38,34 @@ fn disable_read_ahead(_file: &std::fs::File) -> std::io::Result<()> { | |
Ok(()) | ||
} | ||
|
||
// `File::sync_data` uses F_FULLSYNC fcntl on MacOS. It it supposed to be | ||
// the safest way to make sure data is fully persisted. However starting from | ||
// MacOS 11.0 it severely degrades parallel write performance, even when writing to | ||
// other files. Regular `fsync` is good enough for our use case. | ||
// SSDs used in modern macs seem to be able to flush data even on unexpected power loss. | ||
// We performed some testing with power shutdowns and kernel panics on both mac hardware | ||
// and VMs and in all cases `fsync` was enough to prevent data corruption. | ||
#[cfg(target_os = "macos")] | ||
fn fsync(file: &std::fs::File) -> std::io::Result<()> { | ||
use std::os::unix::io::AsRawFd; | ||
if unsafe { libc::fsync(file.as_raw_fd()) } != 0 { | ||
Err(std::io::Error::last_os_error()) | ||
} else { | ||
Ok(()) | ||
#[cfg(unix)] | ||
pub fn madvise_random(map: &mut memmap2::MmapMut) { | ||
unsafe { | ||
libc::madvise(map.as_mut_ptr() as _, map.len(), libc::MADV_RANDOM); | ||
} | ||
} | ||
|
||
#[cfg(not(target_os = "macos"))] | ||
fn fsync(file: &std::fs::File) -> std::io::Result<()> { | ||
file.sync_data() | ||
} | ||
#[cfg(not(unix))] | ||
pub fn madvise_random(_id: TableId, _map: &mut memmap2::MmapMut) {} | ||
|
||
const GROW_SIZE_BYTES: u64 = 256 * 1024; | ||
|
||
#[derive(Debug)] | ||
pub struct TableFile { | ||
pub file: RwLock<Option<std::fs::File>>, | ||
pub map: RwLock<Option<(memmap2::MmapMut, std::fs::File)>>, | ||
pub path: std::path::PathBuf, | ||
pub capacity: AtomicU64, | ||
pub id: TableId, | ||
} | ||
|
||
fn map_len(file_len: u64) -> usize { | ||
file_len as usize + RESERVE_ADDRESS_SPACE | ||
} | ||
|
||
impl TableFile { | ||
pub fn open(filepath: std::path::PathBuf, entry_size: u16, id: TableId) -> Result<Self> { | ||
let mut capacity = 0u64; | ||
let file = if std::fs::metadata(&filepath).is_ok() { | ||
let map = if std::fs::metadata(&filepath).is_ok() { | ||
let file = try_io!(std::fs::OpenOptions::new() | ||
.read(true) | ||
.write(true) | ||
|
@@ -85,13 +79,16 @@ impl TableFile { | |
} else { | ||
capacity = len / entry_size as u64; | ||
} | ||
Some(file) | ||
let mut map = | ||
try_io!(unsafe { memmap2::MmapOptions::new().len(map_len(len)).map_mut(&file) }); | ||
madvise_random(&mut map); | ||
Some((map, file)) | ||
} else { | ||
None | ||
}; | ||
Ok(TableFile { | ||
path: filepath, | ||
file: RwLock::new(file), | ||
map: RwLock::new(map), | ||
capacity: AtomicU64::new(capacity), | ||
id, | ||
}) | ||
|
@@ -108,83 +105,38 @@ impl TableFile { | |
Ok(file) | ||
} | ||
|
||
#[cfg(unix)] | ||
pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<()> { | ||
use std::os::unix::fs::FileExt; | ||
try_io!(self | ||
.file | ||
.read() | ||
.as_ref() | ||
.ok_or_else(|| Error::Corruption("File does not exist.".into()))? | ||
.read_exact_at(buf, offset)); | ||
Ok(()) | ||
} | ||
|
||
#[cfg(unix)] | ||
pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<()> { | ||
use std::os::unix::fs::FileExt; | ||
try_io!(self.file.read().as_ref().unwrap().write_all_at(buf, offset)); | ||
let offset = offset as usize; | ||
let map = self.map.read(); | ||
let (map, _) = map.as_ref().unwrap(); | ||
buf.copy_from_slice(&map[offset..offset + buf.len()]); | ||
Ok(()) | ||
} | ||
|
||
#[cfg(windows)] | ||
pub fn read_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<()> { | ||
use crate::error::Error; | ||
use std::{io, os::windows::fs::FileExt}; | ||
|
||
let file = self.file.read(); | ||
let file = file.as_ref().ok_or_else(|| Error::Corruption("File does not exist.".into()))?; | ||
|
||
while !buf.is_empty() { | ||
match file.seek_read(buf, offset) { | ||
Ok(0) => break, | ||
Ok(n) => { | ||
buf = &mut buf[n..]; | ||
offset += n as u64; | ||
}, | ||
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => { | ||
// Try again | ||
}, | ||
Err(e) => return Err(Error::Io(e)), | ||
} | ||
} | ||
|
||
if !buf.is_empty() { | ||
Err(Error::Io(io::Error::new( | ||
io::ErrorKind::UnexpectedEof, | ||
"failed to fill whole buffer", | ||
))) | ||
} else { | ||
Ok(()) | ||
} | ||
pub fn slice_at(&self, offset: u64, len: usize) -> &[u8] { | ||
let offset = offset as usize; | ||
let map = self.map.read(); | ||
let (map, _) = map.as_ref().unwrap(); | ||
let data: &[u8] = unsafe { | ||
let ptr = map.as_ptr().add(offset); | ||
std::slice::from_raw_parts(ptr, len) | ||
}; | ||
data | ||
} | ||
|
||
#[cfg(windows)] | ||
pub fn write_at(&self, mut buf: &[u8], mut offset: u64) -> Result<()> { | ||
use crate::error::Error; | ||
use std::{io, os::windows::fs::FileExt}; | ||
|
||
let file = self.file.read(); | ||
let file = file.as_ref().unwrap(); | ||
|
||
while !buf.is_empty() { | ||
match file.seek_write(buf, offset) { | ||
Ok(0) => | ||
return Err(Error::Io(io::Error::new( | ||
io::ErrorKind::WriteZero, | ||
"failed to write whole buffer", | ||
))), | ||
Ok(n) => { | ||
buf = &buf[n..]; | ||
offset += n as u64; | ||
}, | ||
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => { | ||
// Try again | ||
}, | ||
Err(e) => return Err(Error::Io(e)), | ||
} | ||
} | ||
|
||
pub fn write_at(&self, buf: &[u8], offset: u64) -> Result<()> { | ||
let map = self.map.read(); | ||
let (map, _) = map.as_ref().unwrap(); | ||
let offset = offset as usize; | ||
|
||
// Nasty mutable pointer cast. We do ensure that all chunks that are being written are | ||
// accessed through the overlay in other threads. | ||
let ptr: *mut u8 = map.as_ptr() as *mut u8; | ||
let data: &mut [u8] = unsafe { | ||
let ptr = ptr.add(offset); | ||
std::slice::from_raw_parts_mut(ptr, buf.len()) | ||
}; | ||
data.copy_from_slice(buf); | ||
Ok(()) | ||
} | ||
|
||
|
@@ -193,26 +145,46 @@ impl TableFile { | |
capacity += GROW_SIZE_BYTES / entry_size as u64; | ||
|
||
self.capacity.store(capacity, Ordering::Relaxed); | ||
let mut file = self.file.upgradable_read(); | ||
if file.is_none() { | ||
let mut wfile = RwLockUpgradableReadGuard::upgrade(file); | ||
*wfile = Some(self.create_file()?); | ||
file = RwLockWriteGuard::downgrade_to_upgradable(wfile); | ||
let mut map_and_file = self.map.write(); | ||
match map_and_file.as_mut() { | ||
None => { | ||
let file = self.create_file()?; | ||
try_io!(file.set_len(capacity * entry_size as u64)); | ||
let mut map = try_io!(unsafe { | ||
memmap2::MmapOptions::new().len(RESERVE_ADDRESS_SPACE).map_mut(&file) | ||
}); | ||
madvise_random(&mut map); | ||
*map_and_file = Some((map, file)); | ||
}, | ||
Some((map, file)) => { | ||
let new_len = capacity * entry_size as u64; | ||
try_io!(file.set_len(new_len)); | ||
if map.len() < new_len as usize { | ||
let mut new_map = try_io!(unsafe { | ||
memmap2::MmapOptions::new().len(map_len(new_len)).map_mut(&*file) | ||
}); | ||
madvise_random(&mut new_map); | ||
let old_map = std::mem::replace(map, new_map); | ||
try_io!(old_map.flush()); | ||
// Leak the old mapping as there might be concurrent readers. | ||
std::mem::forget(old_map); | ||
Comment on lines
+172
to
+173
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this leak not an issue? I'm trying to identify the root cause of #233 and this looks like a candidate. |
||
} | ||
}, | ||
} | ||
try_io!(file.as_ref().unwrap().set_len(capacity * entry_size as u64)); | ||
Ok(()) | ||
} | ||
|
||
pub fn flush(&self) -> Result<()> { | ||
if let Some(file) = self.file.read().as_ref() { | ||
cheme marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try_io!(fsync(file)); | ||
if let Some((map, _)) = self.map.read().as_ref() { | ||
try_io!(map.flush()); | ||
} | ||
Ok(()) | ||
} | ||
|
||
pub fn remove(&self) -> Result<()> { | ||
let mut file = self.file.write(); | ||
if let Some(file) = file.take() { | ||
let mut map = self.map.write(); | ||
if let Some((map, file)) = map.take() { | ||
drop(map); | ||
drop(file); | ||
try_io!(std::fs::remove_file(&self.path)); | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder if we also should try to align the size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm made is so that file size is always aligned.