diff --git a/src/metrics.rs b/src/metrics.rs index 3e8c54d63..fd99642ea 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -266,7 +266,7 @@ impl Metrics { + self.tree_cas.count() + self.tree_scan.count() + self.tree_reverse_scan.count(); - let loop_pct = total_loops * 100 / total_ops; + let loop_pct = total_loops * 100 / (total_ops + 1); println!( "tree contention loops: {} ({}% retry rate)", total_loops, loop_pct @@ -293,7 +293,7 @@ impl Metrics { lat("page_out", &self.page_out), ]); let hit_ratio = (self.get_page.count() - self.pull.count()) * 100 - / std::cmp::max(1, self.get_page.count()); + / (self.get_page.count() + 1); println!("hit ratio: {}%", hit_ratio); println!("{}", std::iter::repeat("-").take(134).collect::()); @@ -324,7 +324,7 @@ impl Metrics { self.log_reservation_attempts.load(Acquire); let log_reservation_retry_rate = (log_reservation_attempts - log_reservations) * 100 - / log_reservations; + / (log_reservations + 1); println!("log reservations: {}", log_reservations); println!( "log res attempts: {}, ({}% retry rate)", diff --git a/src/pagecache/iobuf.rs b/src/pagecache/iobuf.rs index 58dd333ff..77fe6259a 100644 --- a/src/pagecache/iobuf.rs +++ b/src/pagecache/iobuf.rs @@ -1,5 +1,6 @@ use std::{ alloc::{alloc, dealloc, Layout}, + cell::UnsafeCell, sync::atomic::{AtomicBool, AtomicPtr}, }; @@ -61,7 +62,7 @@ impl Drop for AlignedBuf { } pub(crate) struct IoBuf { - buf: Arc, + buf: Arc>, header: CachePadded, base: usize, pub offset: LogOffset, @@ -75,15 +76,45 @@ pub(crate) struct IoBuf { #[allow(unsafe_code)] unsafe impl Sync for IoBuf {} +#[allow(unsafe_code)] +unsafe impl Send for IoBuf {} + impl IoBuf { + /// # Safety + /// + /// This operation provides access to a mutable buffer of + /// uninitialized memory. For this to be correct, we must + /// ensure that: + /// 1. overlapping mutable slices are never created. + /// 2. a read to any subslice of this slice only happens + /// after a write has initialized that memory + /// + /// It is intended that the log reservation code guarantees + /// that no two `Reservation` objects will hold overlapping + /// mutable slices to our io buffer. + /// + /// It is intended that the `write_to_log` function only + /// tries to write initialized bytes to the underlying storage. + /// + /// It is intended that the `write_to_log` function will + /// initialize any yet-to-be-initialized bytes before writing + /// the buffer to storage. #1040 added logic that was intended + /// to meet this requirement. + /// + /// The safety of this method was discussed in #1044. pub(crate) fn get_mut_range( &self, at: usize, len: usize, ) -> &'static mut [u8] { - assert!(self.buf.1 >= at + len); + let buf_ptr = self.buf.get(); + unsafe { - std::slice::from_raw_parts_mut(self.buf.0.add(self.base + at), len) + assert!((*buf_ptr).1 >= at + len); + std::slice::from_raw_parts_mut( + (*buf_ptr).0.add(self.base + at), + len, + ) } } @@ -121,7 +152,7 @@ impl IoBuf { unsafe { std::ptr::copy_nonoverlapping( header_bytes.as_ptr(), - self.buf.0, + (*self.buf.get()).0, SEG_HEADER_LEN, ); } @@ -295,7 +326,7 @@ impl IoBufs { ); let mut iobuf = IoBuf { - buf: Arc::new(AlignedBuf::new(segment_size)), + buf: Arc::new(UnsafeCell::new(AlignedBuf::new(segment_size))), header: CachePadded::new(AtomicU64::new(0)), base: 0, offset: lid, @@ -319,7 +350,7 @@ impl IoBufs { ); IoBuf { - buf: Arc::new(AlignedBuf::new(segment_size)), + buf: Arc::new(UnsafeCell::new(AlignedBuf::new(segment_size))), header: CachePadded::new(AtomicU64::new(0)), base, offset: next_lid, @@ -573,7 +604,10 @@ impl IoBufs { let header_bytes = header.serialize(); // initialize the remainder of this buffer (only pad_len of this will be part of the Cap message) - let padding_bytes = vec![MessageKind::Corrupted.into(); unused_space - header_bytes.len()]; + let padding_bytes = vec![ + MessageKind::Corrupted.into(); + unused_space - header_bytes.len() + ]; #[allow(unsafe_code)] unsafe { @@ -1040,7 +1074,7 @@ pub(in crate::pagecache) fn maybe_seal_and_write_iobuf( // its entire life cycle as soon as we do that. let next_iobuf = if maxed { let mut next_iobuf = IoBuf { - buf: Arc::new(AlignedBuf::new(segment_size)), + buf: Arc::new(UnsafeCell::new(AlignedBuf::new(segment_size))), header: CachePadded::new(AtomicU64::new(0)), base: 0, offset: next_offset,