Skip to content

Unsafe removal #1488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions benchmarks/criterion/Cargo.toml
Original file line number Diff line number Diff line change
@@ -14,4 +14,6 @@ harness = false
[dependencies]
criterion = "0.3.0"
sled = { path = "../.." }

[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dependencies]
jemallocator = "0.3.2"
12 changes: 4 additions & 8 deletions benchmarks/criterion/benches/sled.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use criterion::{criterion_group, criterion_main, Criterion};

use jemallocator::Jemalloc;

use sled::Config;

#[cfg_attr(
// only enable jemalloc on linux and macos by default
any(target_os = "linux", target_os = "macos"),
global_allocator
)]
static ALLOC: Jemalloc = Jemalloc;
// only enable jemalloc on linux and macos by default
#[cfg(any(target_os = "linux", target_os = "macos"))]
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

fn counter() -> usize {
use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
56 changes: 0 additions & 56 deletions src/fastcmp.rs

This file was deleted.

2 changes: 0 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -192,7 +192,6 @@ mod context;
mod db;
mod dll;
mod ebr;
mod fastcmp;
mod fastlock;
mod fnv;
mod histogram;
@@ -302,7 +301,6 @@ use {
pin as crossbeam_pin, Atomic, Guard as CrossbeamGuard, Owned,
Shared,
},
fastcmp::fastcmp,
lru::Lru,
meta::Meta,
node::Node,
5 changes: 2 additions & 3 deletions src/lru.rs
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@ use std::{
borrow::{Borrow, BorrowMut},
convert::TryFrom,
hash::{Hash, Hasher},
mem::MaybeUninit,
sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
};

@@ -38,7 +37,7 @@ impl Default for AccessBlock {
fn default() -> AccessBlock {
AccessBlock {
len: AtomicUsize::new(0),
block: unsafe { MaybeUninit::zeroed().assume_init() },
block: [(); MAX_QUEUE_ITEMS].map(|_| AtomicU64::default() ),
next: AtomicPtr::default(),
}
}
@@ -53,7 +52,7 @@ impl AccessBlock {
fn new(item: CacheAccess) -> AccessBlock {
let mut ret = AccessBlock {
len: AtomicUsize::new(1),
block: unsafe { MaybeUninit::zeroed().assume_init() },
block: [(); MAX_QUEUE_ITEMS].map(|_| AtomicU64::default() ),
next: AtomicPtr::default(),
};
ret.block[0] = AtomicU64::from(u64::from(item));
133 changes: 61 additions & 72 deletions src/node.rs
Original file line number Diff line number Diff line change
@@ -41,7 +41,8 @@ fn uninitialized_node(len: usize) -> Inner {
#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct Header {
// NB always lay out fields from largest to smallest to properly pack the struct
// NB always lay out fields from largest to smallest to properly pack the
// struct
pub next: Option<NonZeroU64>,
pub merging_child: Option<NonZeroU64>,
lo_len: u64,
@@ -96,16 +97,12 @@ fn apply_computed_distance(mut buf: &mut [u8], mut distance: usize) {

// TODO change to u64 or u128 output
// This function has several responsibilities:
// * `find` will call this when looking for the
// proper child pid on an index, with slice
// lengths that may or may not match
// * `KeyRef::Ord` and `KeyRef::distance` call
// this while performing node iteration,
// again with possibly mismatching slice
// lengths. Merging nodes together, or
// merging overlays into inner nodes
// will rely on this functionality, and
// it's possible for the lengths to vary.
// * `find` will call this when looking for the proper child pid on an index,
// with slice lengths that may or may not match
// * `KeyRef::Ord` and `KeyRef::distance` call this while performing node
// iteration, again with possibly mismatching slice lengths. Merging nodes
// together, or merging overlays into inner nodes will rely on this
// functionality, and it's possible for the lengths to vary.
//
// This is not a general-purpose function. It
// is not possible to determine distances when
@@ -412,9 +409,10 @@ impl<'a> Iterator for Iter<'a> {
(Some((_, Some(_))), None) => {
log::trace!("src/node.rs:114");
log::trace!("iterator returning {:?}", self.next_a);
return self.next_a.take().map(|(k, v)| {
(KeyRef::Slice(k), v.unwrap().as_ref())
});
return self
.next_a
.take()
.map(|(k, v)| (KeyRef::Slice(k), v.unwrap().as_ref()));
}
(Some((k_a, v_a_opt)), Some((k_b, _))) => {
let cmp = KeyRef::Slice(k_a).cmp(&k_b);
@@ -511,9 +509,10 @@ impl<'a> DoubleEndedIterator for Iter<'a> {
(Some((_, Some(_))), None) => {
log::trace!("src/node.rs:483");
log::trace!("iterator returning {:?}", self.next_back_a);
return self.next_back_a.take().map(|(k, v)| {
(KeyRef::Slice(k), v.unwrap().as_ref())
});
return self
.next_back_a
.take()
.map(|(k, v)| (KeyRef::Slice(k), v.unwrap().as_ref()));
}
(Some((k_a, Some(_))), Some((k_b, _))) if k_b > *k_a => {
log::trace!("src/node.rs:508");
@@ -522,18 +521,20 @@ impl<'a> DoubleEndedIterator for Iter<'a> {
}
(Some((k_a, Some(_))), Some((k_b, _))) if k_b < *k_a => {
log::trace!("iterator returning {:?}", self.next_back_a);
return self.next_back_a.take().map(|(k, v)| {
(KeyRef::Slice(k), v.unwrap().as_ref())
});
return self
.next_back_a
.take()
.map(|(k, v)| (KeyRef::Slice(k), v.unwrap().as_ref()));
}
(Some((k_a, Some(_))), Some((k_b, _))) if k_b == *k_a => {
// prefer overlay, discard node value
self.next_back_b.take();
log::trace!("src/node.rs:520");
log::trace!("iterator returning {:?}", self.next_back_a);
return self.next_back_a.take().map(|(k, v)| {
(KeyRef::Slice(k), v.unwrap().as_ref())
});
return self
.next_back_a
.take()
.map(|(k, v)| (KeyRef::Slice(k), v.unwrap().as_ref()));
}
_ => unreachable!(
"did not expect combination a: {:?} b: {:?}",
@@ -905,8 +906,9 @@ impl Node {
Some(Node { overlay: Default::default(), inner: new_inner })
}

/// `node_kv_pair` returns either the existing (node/key, value, current offset) tuple or
/// (node/key, none, future offset) where a node/key is node level encoded key.
/// `node_kv_pair` returns either the existing (node/key, value, current
/// offset) tuple or (node/key, none, future offset) where a node/key is
/// node level encoded key.
pub(crate) fn node_kv_pair<'a>(
&'a self,
key: &'a [u8],
@@ -949,7 +951,7 @@ impl Node {
return Some((
self.prefix_decode(self.inner.index_key(idx)),
self.inner.index_value(idx).into(),
))
));
}
Err(idx) => idx,
};
@@ -1018,7 +1020,7 @@ impl Node {
return Some((
self.prefix_decode(self.inner.index_key(idx)),
self.inner.index_value(idx).into(),
))
));
}
Err(idx) => idx,
};
@@ -1088,7 +1090,12 @@ impl Node {
let pid_bytes = self.index_value(idx);
let pid = u64::from_le_bytes(pid_bytes.try_into().unwrap());

log::trace!("index_next_node for key {:?} returning pid {} after searching node {:?}", key, pid, self);
log::trace!(
"index_next_node for key {:?} returning pid {} after searching node {:?}",
key,
pid,
self
);
(is_leftmost, pid)
}

@@ -1741,29 +1748,10 @@ impl Inner {
* (tf!(size_of::<usize>(), u32)
- u32::from(self.offset_bytes)));

let mut tmp = std::mem::MaybeUninit::<usize>::uninit();
let len = size_of::<usize>();

// we use unsafe code here because it cuts a significant number of
// CPU cycles on a simple insertion workload compared to using the
// more idiomatic approach of copying the correct number of bytes into
// a buffer initialized with zeroes. the seemingly "less" unsafe
// approach of using ptr::copy_nonoverlapping did not improve matters.
// using a match statement on offset_bytes and performing simpler
// casting for one or two bytes slowed things down due to increasing
// code size. this approach is branch-free and cut CPU usage of this
// function from 7-11% down to 0.5-2% in a monotonic insertion workload.
#[allow(unsafe_code)]
unsafe {
let ptr: *const u8 = self.ptr().add(start);
std::ptr::copy_nonoverlapping(
ptr,
tmp.as_mut_ptr() as *mut u8,
len,
);
*tmp.as_mut_ptr() &= mask;
tmp.assume_init()
}
usize::from_ne_bytes(self.buf()[start..start + len].try_into().unwrap())
& mask
}

fn set_offset(&mut self, index: usize, offset: usize) {
@@ -2217,10 +2205,17 @@ impl Inner {
{
// search key does not evenly fit based on
// our fixed stride length
log::trace!("failed to find, search: {:?} lo: {:?} \
log::trace!(
"failed to find, search: {:?} lo: {:?} \
prefix_len: {} distance: {} stride: {} offset: {} children: {}, node: {:?}",
key, self.lo(), self.prefix_len, distance,
stride.get(), offset, self.children, self
key,
self.lo(),
self.prefix_len,
distance,
stride.get(),
offset,
self.children,
self
);
return Err((offset + 1).min(self.children()));
}
@@ -2239,7 +2234,7 @@ impl Inner {
let mid = left + size / 2;

let l = self.index_key(mid);
let cmp = crate::fastcmp(l.unwrap_slice(), key);
let cmp = l.unwrap_slice().cmp(key);

if cmp == Less {
left = mid + 1;
@@ -2263,19 +2258,19 @@ impl Inner {
fn iter_keys(
&self,
) -> impl Iterator<Item = KeyRef<'_>>
+ ExactSizeIterator
+ DoubleEndedIterator
+ Clone {
+ ExactSizeIterator
+ DoubleEndedIterator
+ Clone {
(0..self.children()).map(move |idx| self.index_key(idx))
}

fn iter_index_pids(
&self,
) -> impl '_
+ Iterator<Item = u64>
+ ExactSizeIterator
+ DoubleEndedIterator
+ Clone {
+ Iterator<Item = u64>
+ ExactSizeIterator
+ DoubleEndedIterator
+ Clone {
assert!(self.is_index);
self.iter_values().map(move |pid_bytes| {
u64::from_le_bytes(pid_bytes.try_into().unwrap())
@@ -2308,21 +2303,13 @@ impl Inner {
pub(crate) fn hi(&self) -> Option<&[u8]> {
let start = tf!(self.lo_len) + size_of::<Header>();
let end = start + tf!(self.hi_len);
if start == end {
None
} else {
Some(&self.as_ref()[start..end])
}
if start == end { None } else { Some(&self.as_ref()[start..end]) }
}

fn hi_mut(&mut self) -> Option<&mut [u8]> {
let start = tf!(self.lo_len) + size_of::<Header>();
let end = start + tf!(self.hi_len);
if start == end {
None
} else {
Some(&mut self.as_mut()[start..end])
}
if start == end { None } else { Some(&mut self.as_mut()[start..end]) }
}

fn index_key(&self, idx: usize) -> KeyRef<'_> {
@@ -3000,7 +2987,8 @@ mod test {

#[test]
fn node_bug_02() {
// postmortem: the test code had some issues with handling invalid keys for nodes
// postmortem: the test code had some issues with handling invalid keys
// for nodes
let node = Inner::new(
&[47, 97][..],
None,
@@ -3057,7 +3045,8 @@ mod test {
#[test]
fn node_bug_05() {
// postmortem: `prop_indexable` did not account for the requirement
// of feeding sorted items that are >= the lo key to the Node::new method.
// of feeding sorted items that are >= the lo key to the Node::new
// method.
assert!(prop_indexable(
vec![1],
vec![],
119 changes: 66 additions & 53 deletions src/pagecache/iobuf.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::ops::DerefMut;
use std::ptr::{slice_from_raw_parts, slice_from_raw_parts_mut};
use std::{
alloc::{alloc, dealloc, Layout},
cell::UnsafeCell,
@@ -32,6 +34,26 @@ impl AlignedBuf {
}
}

impl Deref for AlignedBuf {
type Target = [u8];

#[inline(always)]
fn deref(&self) -> &Self::Target {
unsafe {
slice_from_raw_parts(self.0, self.1).as_ref().unwrap_unchecked()
}
}
}

impl DerefMut for AlignedBuf {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe {
slice_from_raw_parts_mut(self.0, self.1).as_mut().unwrap_unchecked()
}
}
}

impl Drop for AlignedBuf {
fn drop(&mut self) {
let layout = Layout::from_size_align(self.1, 8192).unwrap();
@@ -65,8 +87,8 @@ impl IoBuf {
/// uninitialized memory. For this to be correct, we must
/// ensure that:
/// 1. overlapping mutable slices are never created.
/// 2. a read to any subslice of this slice only happens
/// after a write has initialized that memory
/// 2. a read to any subslice of this slice only happens after a write has
/// initialized that memory
///
/// It is intended that the log reservation code guarantees
/// that no two `Reservation` objects will hold overlapping
@@ -81,19 +103,38 @@ impl IoBuf {
/// to meet this requirement.
///
/// The safety of this method was discussed in #1044.
pub(crate) fn get_mut_range(
&self,
at: usize,
len: usize,
) -> &'static mut [u8] {
let buf_ptr = self.buf.get();
#[inline(always)]
pub(crate) fn get_mut_range(&self, at: usize, len: usize) -> &mut [u8] {
unsafe { &mut self.get_mut()[at..at + len] }
}

/// # Safety
///
/// This operation provides access to a mutable buffer of
/// uninitialized memory. For this to be correct, we must
/// ensure that:
/// 1. overlapping mutable slices are never created.
/// 2. a read to any subslice of this slice only happens after a write has
/// initialized that memory
///
/// It is intended that the log reservation code guarantees
/// that no two `Reservation` objects will hold overlapping
/// mutable slices to our io buffer.
///
/// It is intended that the `write_to_log` function only
/// tries to write initialized bytes to the underlying storage.
///
/// It is intended that the `write_to_log` function will
/// initialize any yet-to-be-initialized bytes before writing
/// the buffer to storage. #1040 added logic that was intended
/// to meet this requirement.
///
/// The safety of this method was discussed in #1044.
#[inline(always)]
pub(crate) fn get_mut(&self) -> &mut [u8] {
unsafe {
assert!((*buf_ptr).1 >= at + len);
std::slice::from_raw_parts_mut(
(*buf_ptr).0.add(self.base + at),
len,
)
let buf_ptr = self.buf.get().as_mut().unwrap_unchecked();
&mut buf_ptr[self.base..]
}
}

@@ -119,11 +160,8 @@ impl IoBuf {

#[allow(unsafe_code)]
unsafe {
std::ptr::copy_nonoverlapping(
header_bytes.as_ptr(),
(*self.buf.get()).0,
SEG_HEADER_LEN,
);
let buf = self.buf.get().as_mut().unwrap();
buf[..SEG_HEADER_LEN].copy_from_slice(&header_bytes);
}

// ensure writes to the buffer land after our header.
@@ -687,48 +725,20 @@ impl IoBufs {
unused_space - header_bytes.len()
];

#[allow(unsafe_code)]
unsafe {
std::ptr::copy_nonoverlapping(
header_bytes.as_ptr(),
data.as_mut_ptr(),
header_bytes.len(),
);
std::ptr::copy_nonoverlapping(
padding_bytes.as_ptr(),
data.as_mut_ptr().add(header_bytes.len()),
padding_bytes.len(),
);
}

let (hdat, rem) = data.split_at_mut(header_bytes.len());
hdat.copy_from_slice(&header_bytes);
let (pdat, _) = rem.split_at_mut(padding_bytes.len());
pdat.copy_from_slice(&padding_bytes);
// this as to stay aligned with the hashing
let crc32_arr = u32_to_arr(calculate_message_crc32(
&header_bytes,
&padding_bytes[..pad_len],
));

#[allow(unsafe_code)]
unsafe {
std::ptr::copy_nonoverlapping(
crc32_arr.as_ptr(),
// the crc32 is the first part of the buffer
data.as_mut_ptr(),
std::mem::size_of::<u32>(),
);
}
data[0..4].copy_from_slice(&crc32_arr)
} else if maxed {
// initialize the remainder of this buffer's red zone
let data = iobuf.get_mut_range(bytes_to_write, unused_space);

#[allow(unsafe_code)]
unsafe {
// note: this could use slice::fill() if it stabilizes
std::ptr::write_bytes(
data.as_mut_ptr(),
MessageKind::Corrupted.into(),
unused_space,
);
}
data.fill(MessageKind::Corrupted.into());
}

let total_len = if maxed { capacity } else { bytes_to_write };
@@ -953,7 +963,10 @@ pub(in crate::pagecache) fn make_stable_inner(

while stable < lsn {
if let Err(e) = iobufs.config.global_error() {
error!("bailing out of stabilization code due to detected IO error: {:?}", e);
error!(
"bailing out of stabilization code due to detected IO error: {:?}",
e
);
let intervals = iobufs.intervals.lock();

// having held the mutex makes this linearized
60 changes: 19 additions & 41 deletions src/pagecache/logger.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![forbid(unsafe_code)]
use std::fs::File;

use super::{
@@ -421,7 +422,7 @@ impl Log {
return Ok(Reservation {
iobuf,
log: self,
buf: destination,
buf_range: buf_offset..buf_offset + inline_buf_len,
flushed: false,
lsn: reservation_lsn,
pointer,
@@ -571,31 +572,27 @@ impl LogRead {

impl From<[u8; SEG_HEADER_LEN]> for SegmentHeader {
fn from(buf: [u8; SEG_HEADER_LEN]) -> Self {
#[allow(unsafe_code)]
unsafe {
let crc32_header =
arr_to_u32(buf.get_unchecked(0..4)) ^ 0xFFFF_FFFF;
let crc32_header = arr_to_u32(&buf[0..4]) ^ 0xFFFF_FFFF;

let xor_lsn = arr_to_lsn(buf.get_unchecked(4..12));
let lsn = xor_lsn ^ 0x7FFF_FFFF_FFFF_FFFF;
let xor_lsn = arr_to_lsn(&buf[4..12]);
let lsn = xor_lsn ^ 0x7FFF_FFFF_FFFF_FFFF;

let xor_max_stable_lsn = arr_to_lsn(buf.get_unchecked(12..20));
let max_stable_lsn = xor_max_stable_lsn ^ 0x7FFF_FFFF_FFFF_FFFF;
let xor_max_stable_lsn = arr_to_lsn(&buf[12..20]);
let max_stable_lsn = xor_max_stable_lsn ^ 0x7FFF_FFFF_FFFF_FFFF;

let crc32_tested = crc32(&buf[4..20]);
let crc32_tested = crc32(&buf[4..20]);

let ok = crc32_tested == crc32_header;
let ok = crc32_tested == crc32_header;

if !ok {
debug!(
"segment with lsn {} had computed crc {}, \
if !ok {
debug!(
"segment with lsn {} had computed crc {}, \
but stored crc {}",
lsn, crc32_tested, crc32_header
);
}

Self { lsn, max_stable_lsn, ok }
lsn, crc32_tested, crc32_header
);
}

Self { lsn, max_stable_lsn, ok }
}
}

@@ -609,30 +606,11 @@ impl From<SegmentHeader> for [u8; SEG_HEADER_LEN] {
let xor_max_stable_lsn = header.max_stable_lsn ^ 0x7FFF_FFFF_FFFF_FFFF;
let highest_stable_lsn_arr = lsn_to_arr(xor_max_stable_lsn);

#[allow(unsafe_code)]
unsafe {
std::ptr::copy_nonoverlapping(
lsn_arr.as_ptr(),
buf.as_mut_ptr().add(4),
std::mem::size_of::<u64>(),
);
std::ptr::copy_nonoverlapping(
highest_stable_lsn_arr.as_ptr(),
buf.as_mut_ptr().add(12),
std::mem::size_of::<u64>(),
);
}
buf[4..12].copy_from_slice(&lsn_arr);
buf[12..20].copy_from_slice(&highest_stable_lsn_arr);

let crc32 = u32_to_arr(crc32(&buf[4..20]) ^ 0xFFFF_FFFF);

#[allow(unsafe_code)]
unsafe {
std::ptr::copy_nonoverlapping(
crc32.as_ptr(),
buf.as_mut_ptr(),
std::mem::size_of::<u32>(),
);
}
buf[0..4].copy_from_slice(&crc32);

buf
}
50 changes: 18 additions & 32 deletions src/pagecache/reservation.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#![forbid(unsafe_code)]

use crate::{pagecache::*, *};
use std::ops::Range;

/// A pending log reservation which can be aborted or completed.
/// NB the holder should quickly call `complete` or `abort` as
@@ -8,7 +11,7 @@ use crate::{pagecache::*, *};
pub struct Reservation<'a> {
pub(super) log: &'a Log,
pub(super) iobuf: Arc<IoBuf>,
pub(super) buf: &'a mut [u8],
pub(super) buf_range: Range<usize>,
pub(super) flushed: bool,
pub pointer: DiskPtr,
pub lsn: Lsn,
@@ -28,6 +31,10 @@ impl<'a> Drop for Reservation<'a> {
}

impl<'a> Reservation<'a> {
#[inline]
fn buf(&self) -> &mut [u8] {
&mut self.iobuf.get_mut()[self.buf_range.clone()]
}
/// Cancel the reservation, placing a failed flush on disk, returning
/// the (cancelled) log sequence number and file offset.
pub fn abort(mut self) -> Result<(Lsn, DiskPtr)> {
@@ -72,17 +79,16 @@ impl<'a> Reservation<'a> {
self.lsn
);

let this_buf = self.buf();
if self.lsn == peg_lsn {
// this can happen because high-level tree updates
// may result in no work happening.
self.abort()
} else {
self.buf[4] = MessageKind::BatchManifest.into();
this_buf[4] = MessageKind::BatchManifest.into();
let dst = &mut this_buf[self.header_len..];

let buf = lsn_to_arr(peg_lsn);

let dst = &mut self.buf[self.header_len..];

dst.copy_from_slice(&buf);

let mut intervals = self.log.iobufs.intervals.lock();
@@ -99,45 +105,25 @@ impl<'a> Reservation<'a> {
}

self.flushed = true;
let buf = self.buf();

if !valid {
self.buf[4] = MessageKind::Canceled.into();
buf[4] = MessageKind::Canceled.into();

// zero the message contents to prevent UB
#[allow(unsafe_code)]
unsafe {
std::ptr::write_bytes(
self.buf[self.header_len..].as_mut_ptr(),
0,
self.buf.len() - self.header_len,
)
}
buf[self.header_len..].fill(0);
}

// zero the crc bytes to prevent UB
#[allow(unsafe_code)]
unsafe {
std::ptr::write_bytes(
self.buf[..].as_mut_ptr(),
0,
std::mem::size_of::<u32>(),
)
}
buf[0..4].fill(0);

let crc32 = calculate_message_crc32(
self.buf[..self.header_len].as_ref(),
&self.buf[self.header_len..],
&buf[..self.header_len],
&buf[self.header_len..],
);
let crc32_arr = u32_to_arr(crc32);

#[allow(unsafe_code)]
unsafe {
std::ptr::copy_nonoverlapping(
crc32_arr.as_ptr(),
self.buf.as_mut_ptr(),
std::mem::size_of::<u32>(),
);
}
buf[0..4].copy_from_slice(&crc32_arr);
self.log.exit_reservation(&self.iobuf)?;

Ok((self.lsn, self.pointer))