Skip to content

Commit

Permalink
fix: allocate shard metadata lazily (#45)
Browse files Browse the repository at this point in the history
Currently, creating a new `Slab` allocates a shards array of `Shard`
structs. The `Shard` struct itself owns two boxed arrays of local and
shared metadata for each page on that shard. Even though we don't
allocate the actual storage arrays for those pages until they are
needed, allocating the shard metadata eagerly means that a completely
empty slab results in a fairly large memory allocation up front. This is
especially the case when used with the default `Config`, which (on
64-bit machines) allows up to 4096 threads. On a 64-bit machine, the
`Shared` page metadata is 4 words, so 32 bytes, and the `Local` metadata
is another word. 33 bytes * 32 pages per shard = 1056 bytes, which is a
little over 1kb per shard. This means that the default config eagerly
allocates 4096 shards * 1056 bytes is about 4mb of metadata, even when
the program only has one or two threads in it. and the remaining
4000-some possible threads will never allocate their shards.

When most of the shards are empty because there are very few threads in
the program, most of this allocated memory is not *resident*, and gets
paged out by the operating system, but it results in a very surprising
amount of allocated virtual memory. This is the cause of issues like
tokio-rs/tracing#1005.

Furthermore, allocating all of this means that actually _constructing_ a
slab takes a pretty long time. In `tracing-subscriber`, this is normally
not a major issue, since subscribers tend to be created on startup and
live for the entire lifetime of the program. However, in some use-cases,
like creating a separate subscriber for each test, the performance
impact of allocating all that metadata is quite significant. See, for
example:
rust-lang/rust-analyzer#5792 (comment)

This branch fixes this by allocating the shard metadata only when a new
shard is actually needed by a new thread. The shard array is now an
array of `AtomicPtr`s to shards, and shards are only allocated the first
time they are `insert`ed to. Since each thread can only insert to its
own shard, the synchronization logic for this is fairly simple. However,
since the shards are morally, although not actually, _owned_ by these
`AtomicPtr`s, there is the potential for leaks when a slab is dropped,
if we don't also ensure that all the shards it creates are also dropped.
Therefore, we use `loom::alloc::Track` for leak detection in tests.
Fortunately, the logic for ensuring these are deallocated is not too
complex.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw authored Oct 15, 2020
1 parent cd59d8c commit 7683b8b
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 26 deletions.
14 changes: 12 additions & 2 deletions src/iter.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,36 @@
use crate::{page, Shard};
use crate::{page, shard};
use std::slice;

pub struct UniqueIter<'a, T, C: crate::cfg::Config> {
pub(super) shards: slice::IterMut<'a, Shard<Option<T>, C>>,
pub(super) shards: shard::IterMut<'a, Option<T>, C>,
pub(super) pages: slice::Iter<'a, page::Shared<Option<T>, C>>,
pub(super) slots: Option<page::Iter<'a, T, C>>,
}

impl<'a, T, C: crate::cfg::Config> Iterator for UniqueIter<'a, T, C> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
test_println!("UniqueIter::next");
loop {
test_println!("-> try next slot");
if let Some(item) = self.slots.as_mut().and_then(|slots| slots.next()) {
test_println!("-> found an item!");
return Some(item);
}

test_println!("-> try next page");
if let Some(page) = self.pages.next() {
test_println!("-> found another page");
self.slots = page.iter();
continue;
}

test_println!("-> try next shard");
if let Some(shard) = self.shards.next() {
test_println!("-> found another shard");
self.pages = shard.iter();
} else {
test_println!("-> all done!");
return None;
}
}
Expand Down
11 changes: 5 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ use std::{fmt, marker::PhantomData};
///
/// See the [crate-level documentation](index.html) for details on using this type.
pub struct Slab<T, C: cfg::Config = DefaultConfig> {
shards: Box<[Shard<Option<T>, C>]>,
shards: shard::Array<Option<T>, C>,
_cfg: PhantomData<C>,
}

Expand All @@ -260,9 +260,8 @@ impl<T> Slab<T> {
/// Returns a new slab with the provided configuration parameters.
pub fn new_with_config<C: cfg::Config>() -> Slab<T, C> {
C::validate();
let shards = (0..C::MAX_SHARDS).map(Shard::new).collect();
Slab {
shards,
shards: shard::Array::new(),
_cfg: PhantomData,
}
}
Expand Down Expand Up @@ -300,10 +299,10 @@ impl<T, C: cfg::Config> Slab<T, C> {
/// assert_eq!(slab.get(key).unwrap(), "hello world");
/// ```
pub fn insert(&self, value: T) -> Option<usize> {
let tid = Tid::<C>::current();
let (tid, shard) = self.shards.current();
test_println!("insert {:?}", tid);
let mut value = Some(value);
self.shards[tid.as_usize()]
shard
.init_with(|slot| slot.insert(&mut value))
.map(|idx| tid.pack(idx))
}
Expand Down Expand Up @@ -424,7 +423,7 @@ impl<T, C: cfg::Config> Slab<T, C> {
let tid = C::unpack_tid(idx);

test_println!("rm {:?}", tid);
let shard = &self.shards[tid.as_usize()];
let shard = self.shards.get(tid.as_usize())?;
if tid.is_current() {
shard.take_local(idx)
} else {
Expand Down
13 changes: 8 additions & 5 deletions src/page/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,17 +327,20 @@ where

// Set the slot's state to NOT_REMOVED.
let new_lifecycle = gen.pack(Lifecycle::<C>::NOT_REMOVED.pack(0));
let actual = self
.lifecycle
.compare_and_swap(lifecycle, new_lifecycle, Ordering::AcqRel);
if actual != lifecycle {
let was_set = self.lifecycle.compare_exchange(
lifecycle,
new_lifecycle,
Ordering::AcqRel,
Ordering::Acquire,
);
if let Err(_actual) = was_set {
// The slot was modified while we were inserting to it! It's no
// longer safe to insert a new value.
test_println!(
"-> modified during insert, cancelling! new={:#x}; expected={:#x}; actual={:#x};",
new_lifecycle,
lifecycle,
actual
_actual
);
return None;
}
Expand Down
11 changes: 5 additions & 6 deletions src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
cfg::{self, CfgPrivate, DefaultConfig},
clear::Clear,
page,
page, shard,
tid::Tid,
Pack, Shard,
};
Expand Down Expand Up @@ -66,7 +66,7 @@ where
T: Clear + Default,
C: cfg::Config,
{
shards: Box<[Shard<T, C>]>,
shards: shard::Array<T, C>,
_cfg: PhantomData<C>,
}

Expand All @@ -81,9 +81,8 @@ where
/// Returns a new `Pool` with the provided configuration parameters.
pub fn new_with_config<C: cfg::Config>() -> Pool<T, C> {
C::validate();
let shards = (0..C::MAX_SHARDS).map(Shard::new).collect();
Pool {
shards,
shards: shard::Array::new(),
_cfg: PhantomData,
}
}
Expand Down Expand Up @@ -137,10 +136,10 @@ where
/// assert_eq!(pool.get(key).unwrap(), String::from("Hello"));
/// ```
pub fn create(&self, initializer: impl FnOnce(&mut T)) -> Option<usize> {
let tid = Tid::<C>::current();
let (tid, shard) = self.shards.current();
let mut init = Some(initializer);
test_println!("pool: create {:?}", tid);
self.shards[tid.as_usize()]
shard
.init_with(|slot| {
let init = init.take().expect("initializer will only be called once");
slot.initialize_state(init)
Expand Down
181 changes: 180 additions & 1 deletion src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@ use crate::{
cfg::{self, CfgPrivate},
clear::Clear,
page,
sync::{
alloc,
atomic::{
AtomicPtr, AtomicUsize,
Ordering::{self, *},
},
},
tid::Tid,
Pack,
};

use std::fmt;
use std::{fmt, ptr, slice};

// ┌─────────────┐ ┌────────┐
// │ page 1 │ │ │
Expand Down Expand Up @@ -43,6 +50,17 @@ pub(crate) struct Shard<T, C: cfg::Config> {
shared: Box<[page::Shared<T, C>]>,
}

pub(crate) struct Array<T, C: cfg::Config> {
shards: Box<[Ptr<T, C>]>,
max: AtomicUsize,
}

struct Ptr<T, C: cfg::Config>(AtomicPtr<alloc::Track<Shard<T, C>>>);

pub(crate) struct IterMut<'a, T: 'a, C: cfg::Config + 'a>(slice::IterMut<'a, Ptr<T, C>>);

// === impl Shard ===

impl<T, C> Shard<T, C>
where
C: cfg::Config,
Expand Down Expand Up @@ -204,3 +222,164 @@ impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Shard<T, C> {
d.field("shared", &self.shared).finish()
}
}

// === impl Array ===

impl<T, C> Array<T, C>
where
C: cfg::Config,
{
pub(crate) fn new() -> Self {
let mut shards = Vec::with_capacity(C::MAX_SHARDS);
for _ in 0..C::MAX_SHARDS {
// XXX(eliza): T_T this could be avoided with maybeuninit or something...
shards.push(Ptr::null());
}
Self {
shards: shards.into(),
max: AtomicUsize::new(0),
}
}

#[inline]
pub(crate) fn get<'a>(&'a self, idx: usize) -> Option<&'a Shard<T, C>> {
test_println!("-> get shard={}", idx);
self.shards.get(idx)?.load(Acquire)
}

#[inline]
pub(crate) fn current<'a>(&'a self) -> (Tid<C>, &'a Shard<T, C>) {
let tid = Tid::<C>::current();
test_println!("current: {:?}", tid);
let idx = tid.as_usize();
// It's okay for this to be relaxed. The value is only ever stored by
// the thread that corresponds to the index, and we are that thread.
let shard = self.shards[idx].load(Relaxed).unwrap_or_else(|| {
let ptr = Box::into_raw(Box::new(alloc::Track::new(Shard::new(idx))));
test_println!("-> allocated new shard for index {} at {:p}", idx, ptr);
self.shards[idx].set(ptr);
let mut max = self.max.load(Acquire);
while max < idx {
match self.max.compare_exchange(max, idx, AcqRel, Acquire) {
Ok(_) => break,
Err(actual) => max = actual,
}
}
test_println!("-> highest index={}, prev={}", std::cmp::max(max, idx), max);
unsafe {
// Safety: we just put it there!
&*ptr
}
.get_ref()
});
(tid, shard)
}

pub(crate) fn iter_mut(&mut self) -> IterMut<'_, T, C> {
test_println!("Array::iter_mut");
let max = self.max.load(Acquire);
test_println!("-> highest index={}", max);
IterMut(self.shards[0..=max].iter_mut())
}
}

impl<T, C: cfg::Config> Drop for Array<T, C> {
fn drop(&mut self) {
// XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`...
let max = self.max.load(Acquire);
for shard in &self.shards[0..=max] {
// XXX(eliza): this could be `with_mut` if we wanted to impl a wrapper for std atomics to change `get_mut` to `with_mut`...
let ptr = shard.0.load(Acquire);
if ptr.is_null() {
continue;
}
let shard = unsafe {
// Safety: this is the only place where these boxes are
// deallocated, and we have exclusive access to the shard array,
// because...we are dropping it...
Box::from_raw(ptr)
};
drop(shard)
}
}
}

impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Array<T, C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let max = self.max.load(Acquire);
let mut set = f.debug_map();
for shard in &self.shards[0..=max] {
let ptr = shard.0.load(Acquire);
if let Some(shard) = ptr::NonNull::new(ptr) {
set.entry(&format_args!("{:p}", ptr), unsafe { shard.as_ref() });
} else {
set.entry(&format_args!("{:p}", ptr), &());
}
}
set.finish()
}
}

// === impl Ptr ===

impl<T, C: cfg::Config> Ptr<T, C> {
#[inline]
fn null() -> Self {
Self(AtomicPtr::new(ptr::null_mut()))
}

#[inline]
fn load(&self, order: Ordering) -> Option<&Shard<T, C>> {
let ptr = self.0.load(order);
test_println!("---> loaded={:p} (order={:?})", ptr, order);
if ptr.is_null() {
test_println!("---> null");
return None;
}
let track = unsafe {
// Safety: The returned reference will have the same lifetime as the
// reference to the shard pointer, which (morally, if not actually)
// owns the shard. The shard is only deallocated when the shard
// array is dropped, and it won't be dropped while this pointer is
// borrowed --- and the returned reference has the same lifetime.
//
// We know that the pointer is not null, because we just
// null-checked it immediately prior.
&*ptr
};

Some(track.get_ref())
}

#[inline]
fn set(&self, new: *mut alloc::Track<Shard<T, C>>) {
self.0
.compare_exchange(ptr::null_mut(), new, AcqRel, Acquire)
.expect("a shard can only be inserted by the thread that owns it, this is a bug!");
}
}

// === Iterators ===

impl<'a, T, C> Iterator for IterMut<'a, T, C>
where
T: 'a,
C: cfg::Config + 'a,
{
type Item = &'a Shard<T, C>;
fn next(&mut self) -> Option<Self::Item> {
test_println!("IterMut::next");
loop {
// Skip over empty indices if they are less than the highest
// allocated shard. Some threads may have accessed the slab
// (generating a thread ID) but never actually inserted data, so
// they may have never allocated a shard.
let next = self.0.next();
test_println!("-> next.is_some={}", next.is_some());
if let Some(shard) = next?.load(Acquire) {
test_println!("-> done");
return Some(shard);
}
}
}
}
Loading

0 comments on commit 7683b8b

Please sign in to comment.