Skip to content

Commit ae51601

Browse files
Introduce OnceVec<T> primitive and use it for AllocId caches
This significantly reduces contention when running miri under -Zthreads, allowing us to scale to 30ish cores (from ~7-8 without this patch).
1 parent 203e6c1 commit ae51601

File tree

5 files changed

+351
-24
lines changed

5 files changed

+351
-24
lines changed

compiler/rustc_data_structures/src/sync.rs

+3
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ pub use vec::{AppendOnlyIndexVec, AppendOnlyVec};
5959

6060
mod vec;
6161

62+
mod once_vec;
63+
pub use once_vec::OnceVec;
64+
6265
mod freeze;
6366
pub use freeze::{FreezeLock, FreezeReadGuard, FreezeWriteGuard};
6467

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
use std::alloc::Layout;
2+
use std::marker::PhantomData;
3+
use std::mem::MaybeUninit;
4+
use std::ptr::NonNull;
5+
use std::sync::Mutex;
6+
use std::sync::atomic::{AtomicPtr, AtomicU8, Ordering};
7+
8+
/// Provides a singly-settable Vec.
9+
///
10+
/// This provides amortized, concurrent O(1) access to &T, expecting a densely numbered key space
11+
/// (all value slots are allocated up to the highest key inserted).
12+
pub struct OnceVec<T> {
13+
// Provide storage for up to 2^35 elements, which we expect to be enough in practice -- but can
14+
// be increased if needed. We may want to make the `slabs` list dynamic itself, likely by
15+
// indirecting through one more pointer to reduce space consumption of OnceVec if this grows
16+
// much larger.
17+
//
18+
// None of the code makes assumptions based on this size so bumping it up is easy.
19+
slabs: [Slab<T>; 36],
20+
}
21+
22+
impl<T> Default for OnceVec<T> {
23+
fn default() -> Self {
24+
OnceVec { slabs: [const { Slab::new() }; 36] }
25+
}
26+
}
27+
28+
unsafe impl<#[may_dangle] T> Drop for OnceVec<T> {
29+
fn drop(&mut self) {
30+
for (idx, slab) in self.slabs.iter_mut().enumerate() {
31+
unsafe { slab.deallocate(1 << idx) }
32+
}
33+
}
34+
}
35+
36+
impl<T> OnceVec<T> {
37+
#[inline]
38+
fn to_slab_args(idx: usize) -> (usize, usize, usize) {
39+
let slab_idx = (idx + 1).ilog2() as usize;
40+
let cap = 1 << slab_idx;
41+
let idx_in_slab = idx - (cap - 1);
42+
(slab_idx, cap, idx_in_slab)
43+
}
44+
45+
pub fn insert(&self, idx: usize, value: T) -> Result<(), T> {
46+
let (slab_idx, cap, idx_in_slab) = Self::to_slab_args(idx);
47+
self.slabs[slab_idx].insert(cap, idx_in_slab, value)
48+
}
49+
50+
pub fn get(&self, idx: usize) -> Option<&T> {
51+
let (slab_idx, cap, idx_in_slab) = Self::to_slab_args(idx);
52+
self.slabs[slab_idx].get(cap, idx_in_slab)
53+
}
54+
}
55+
56+
struct Slab<T> {
57+
// If non-zero, points to a contiguously allocated block which starts with a bitset
58+
// (two bits per value, one for whether a value is present and the other for whether a value is
59+
// currently being written) and then `[V]` (some of which may be missing).
60+
//
61+
// The capacity is implicit and passed with all accessors.
62+
v: AtomicPtr<u8>,
63+
_phantom: PhantomData<[T; 1]>,
64+
}
65+
66+
impl<T> Slab<T> {
67+
const fn new() -> Slab<T> {
68+
Slab { v: AtomicPtr::new(std::ptr::null_mut()), _phantom: PhantomData }
69+
}
70+
71+
fn initialize(&self, cap: usize) -> NonNull<u8> {
72+
static LOCK: Mutex<()> = Mutex::new(());
73+
74+
if let Some(ptr) = NonNull::new(self.v.load(Ordering::Acquire)) {
75+
return ptr;
76+
}
77+
78+
// If we are initializing the bucket, then acquire a global lock.
79+
//
80+
// This path is quite cold, so it's cheap to use a global lock. This ensures that we never
81+
// have multiple allocations for the same bucket.
82+
let _allocator_guard = LOCK.lock().unwrap_or_else(|e| e.into_inner());
83+
84+
// Check the lock again, sicne we might have been initialized while waiting on the lock.
85+
if let Some(ptr) = NonNull::new(self.v.load(Ordering::Acquire)) {
86+
return ptr;
87+
}
88+
89+
let layout = Self::layout(cap).0;
90+
assert!(layout.size() > 0);
91+
92+
// SAFETY: Checked above that layout is non-zero sized.
93+
let Some(allocation) = NonNull::new(unsafe { std::alloc::alloc_zeroed(layout) }) else {
94+
std::alloc::handle_alloc_error(layout);
95+
};
96+
97+
self.v.store(allocation.as_ptr(), Ordering::Release);
98+
99+
allocation
100+
}
101+
102+
fn bitset(ptr: NonNull<u8>, cap: usize) -> NonNull<[AtomicU8]> {
103+
NonNull::slice_from_raw_parts(ptr.cast(), cap.div_ceil(4))
104+
}
105+
106+
// SAFETY: Must be called on a `initialize`d `ptr` for this capacity.
107+
unsafe fn slice(ptr: NonNull<u8>, cap: usize) -> NonNull<[MaybeUninit<T>]> {
108+
let offset = Self::layout(cap).1;
109+
// SAFETY: Passed up to caller.
110+
NonNull::slice_from_raw_parts(unsafe { ptr.add(offset).cast() }, cap)
111+
}
112+
113+
// idx is already compacted to within this slab
114+
fn get(&self, cap: usize, idx: usize) -> Option<&T> {
115+
// avoid initializing for get queries
116+
let Some(ptr) = NonNull::new(self.v.load(Ordering::Acquire)) else {
117+
return None;
118+
};
119+
120+
let bitset = unsafe { Self::bitset(ptr, cap).as_ref() };
121+
122+
// Check if the entry is initialized.
123+
//
124+
// Bottom 4 bits are the "is initialized" bits, top 4 bits are used for "is initializing"
125+
// lock.
126+
let word = bitset[idx / 4].load(Ordering::Acquire);
127+
if word & (1 << (idx % 4)) == 0 {
128+
return None;
129+
}
130+
131+
// Avoid as_ref() since we don't want to assert shared refs to all slots (some are being
132+
// concurrently updated).
133+
//
134+
// SAFETY: `ptr` is only written by `initialize`, so this is safe.
135+
let slice = unsafe { Self::slice(ptr, cap) };
136+
assert!(idx < slice.len());
137+
// SAFETY: assertion above checks that we're in-bounds.
138+
let slot = unsafe { slice.cast::<T>().add(idx) };
139+
140+
// SAFETY: We checked `bitset` and this value was initialized. Our Acquire load
141+
// establishes the memory ordering with the release store which set the bit, so we're safe
142+
// to read it.
143+
Some(unsafe { slot.as_ref() })
144+
}
145+
146+
// idx is already compacted to within this slab
147+
fn insert(&self, cap: usize, idx: usize, value: T) -> Result<(), T> {
148+
// avoid initializing for get queries
149+
let ptr = self.initialize(cap);
150+
let bitset = unsafe { Self::bitset(ptr, cap).as_ref() };
151+
152+
// Check if the entry is initialized, and lock it for writing.
153+
let word = bitset[idx / 4].fetch_or(1 << (4 + idx % 4), Ordering::AcqRel);
154+
if word & (1 << (idx % 4)) != 0 {
155+
// Already fully initialized prior to us setting the "is writing" bit.
156+
return Err(value);
157+
}
158+
if word & (1 << (4 + idx % 4)) != 0 {
159+
// Someone else already acquired the lock for writing.
160+
return Err(value);
161+
}
162+
163+
let slice = unsafe { Self::slice(ptr, cap) };
164+
assert!(idx < slice.len());
165+
// SAFETY: assertion above checks that we're in-bounds.
166+
let slot = unsafe { slice.cast::<T>().add(idx) };
167+
168+
// SAFETY: We locked this slot for writing with the fetch_or above, and were the first to do
169+
// so (checked in 2nd `if` above).
170+
unsafe {
171+
slot.write(value);
172+
}
173+
174+
// Set the is-present bit, indicating that we have finished writing this value.
175+
// Acquire ensures we don't break synchronizes-with relationships in other bits (unclear if
176+
// strictly necessary but definitely doesn't hurt).
177+
bitset[idx / 4].fetch_or(1 << (idx % 4), Ordering::AcqRel);
178+
179+
Ok(())
180+
}
181+
182+
/// Returns the layout for a Slab with capacity for `cap` elements, and the offset into the
183+
/// allocation at which the T slice starts.
184+
fn layout(cap: usize) -> (Layout, usize) {
185+
Layout::array::<AtomicU8>(cap.div_ceil(4))
186+
.unwrap()
187+
.extend(Layout::array::<T>(cap).unwrap())
188+
.unwrap()
189+
}
190+
191+
// Drop, except passing the capacity
192+
unsafe fn deallocate(&mut self, cap: usize) {
193+
// avoid initializing just to Drop
194+
let Some(ptr) = NonNull::new(self.v.load(Ordering::Acquire)) else {
195+
return;
196+
};
197+
198+
if std::mem::needs_drop::<T>() {
199+
// SAFETY: `ptr` is only written by `initialize`, and zero-init'd so AtomicU8 is present in
200+
// the bitset range.
201+
let bitset = unsafe { Self::bitset(ptr, cap).as_ref() };
202+
// SAFETY: `ptr` is only written by `initialize`, so satisfies slice precondition.
203+
let slice = unsafe { Self::slice(ptr, cap).cast::<T>() };
204+
205+
for (word_idx, word) in bitset.iter().enumerate() {
206+
let word = word.load(Ordering::Acquire);
207+
for word_offset in 0..4 {
208+
if word & (1 << word_offset) != 0 {
209+
// Was initialized, need to drop the value.
210+
let idx = word_idx * 4 + word_offset;
211+
unsafe {
212+
std::ptr::drop_in_place(slice.add(idx).as_ptr());
213+
}
214+
}
215+
}
216+
}
217+
}
218+
219+
let layout = Self::layout(cap).0;
220+
221+
// SAFETY: Allocated with `alloc` and the same layout.
222+
unsafe {
223+
std::alloc::dealloc(ptr.as_ptr(), layout);
224+
}
225+
}
226+
}
227+
228+
#[cfg(test)]
229+
mod test;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use super::*;
2+
3+
#[test]
4+
#[cfg(not(miri))]
5+
fn empty() {
6+
let cache: OnceVec<u32> = OnceVec::default();
7+
for key in 0..u32::MAX {
8+
assert!(cache.get(key as usize).is_none());
9+
}
10+
}
11+
12+
#[test]
13+
fn insert_and_check() {
14+
let cache: OnceVec<usize> = OnceVec::default();
15+
for idx in 0..100 {
16+
cache.insert(idx, idx).unwrap();
17+
}
18+
for idx in 0..100 {
19+
assert_eq!(cache.get(idx), Some(&idx));
20+
}
21+
}
22+
23+
#[test]
24+
fn sparse_inserts() {
25+
let cache: OnceVec<u32> = OnceVec::default();
26+
let end = if cfg!(target_pointer_width = "64") && cfg!(target_os = "linux") {
27+
// For paged memory, 64-bit systems we should be able to sparsely allocate all of the pages
28+
// needed for these inserts cheaply (without needing to actually have gigabytes of resident
29+
// memory).
30+
31
31+
} else {
32+
// Otherwise, still run the test but scaled back:
33+
//
34+
// Each slot is <5 bytes, so 2^25 entries (on non-virtual memory systems, like e.g. Windows)
35+
// will mean 160 megabytes of allocated memory. Going beyond that is probably not reasonable
36+
// for tests.
37+
25
38+
};
39+
for shift in 0..end {
40+
let key = 1u32 << shift;
41+
cache.insert(key as usize, shift).unwrap();
42+
assert_eq!(cache.get(key as usize), Some(&shift));
43+
}
44+
}
45+
46+
#[test]
47+
fn concurrent_stress_check() {
48+
let cache: OnceVec<usize> = OnceVec::default();
49+
std::thread::scope(|s| {
50+
for idx in 0..100 {
51+
let cache = &cache;
52+
s.spawn(move || {
53+
cache.insert(idx, idx).unwrap();
54+
});
55+
}
56+
});
57+
58+
for idx in 0..100 {
59+
assert_eq!(cache.get(idx), Some(&idx));
60+
}
61+
}
62+
63+
#[test]
64+
#[cfg(not(miri))]
65+
fn slot_index_exhaustive() {
66+
let mut prev = None::<(usize, usize, usize)>;
67+
for idx in 0..=u32::MAX as usize {
68+
let slot_idx = OnceVec::<()>::to_slab_args(idx);
69+
if let Some(p) = prev {
70+
if p.0 == slot_idx.0 {
71+
assert_eq!(p.2 + 1, slot_idx.2);
72+
} else {
73+
assert_eq!(slot_idx.2, 0);
74+
}
75+
} else {
76+
assert_eq!(idx, 0);
77+
assert_eq!(slot_idx.2, 0);
78+
assert_eq!(slot_idx.0, 0);
79+
}
80+
81+
prev = Some(slot_idx);
82+
}
83+
}

0 commit comments

Comments
 (0)