Skip to content

Commit 5c9f168

Browse files
committed
feat(profiling): parallel set and string set
1 parent 3e1bd42 commit 5c9f168

File tree

11 files changed

+1306
-4
lines changed

11 files changed

+1306
-4
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libdd-profiling/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ bitmaps = "3.2.0"
2727
byteorder = { version = "1.5", features = ["std"] }
2828
bytes = "1.1"
2929
chrono = {version = "0.4", default-features = false, features = ["std", "clock"]}
30+
crossbeam-utils = { version = "0.8.21" }
3031
libdd-alloc = { version = "1.0.0", path = "../libdd-alloc" }
3132
libdd-profiling-protobuf = { version = "1.0.0", path = "../libdd-profiling-protobuf", features = ["prost_impls"] }
3233
libdd-common = { version = "1.0.0", path = "../libdd-common" }
@@ -39,6 +40,7 @@ hyper-multipart-rfc7578 = "0.9.0"
3940
indexmap = "2.11"
4041
lz4_flex = { version = "0.9", default-features = false, features = ["std", "safe-encode", "frame"] }
4142
mime = "0.3.16"
43+
parking_lot = { version = "0.12", default-features = false }
4244
prost = "0.13.5"
4345
rustc-hash = { version = "1.1", default-features = false }
4446
serde = {version = "1.0", features = ["derive"]}
Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
// This is heavily inspired by the standard library's `Arc` implementation,
4+
// which is dual-licensed as Apache-2.0 or MIT.
5+
6+
use allocator_api2::alloc::{AllocError, Allocator, Global};
7+
use allocator_api2::boxed::Box;
8+
use core::sync::atomic::{fence, AtomicUsize, Ordering};
9+
use core::{alloc::Layout, fmt, mem::ManuallyDrop, ptr};
10+
use core::{marker::PhantomData, ops::Deref, ptr::NonNull};
11+
use crossbeam_utils::CachePadded;
12+
13+
/// A thread-safe reference-counting pointer with only strong references.
14+
///
15+
/// This type is similar to `std::sync::Arc` but intentionally omits APIs that
16+
/// can panic or abort the process. In particular:
17+
/// - There are no weak references.
18+
/// - Cloning uses [`Arc::try_clone`], which returns an error on reference-count overflow instead of
19+
/// aborting the process.
20+
/// - Construction uses fallible allocation via [`Arc::try_new`].
21+
///
22+
/// Deref gives shared access to the inner value; mutation should use interior
23+
/// mutability primitives as with `std::sync::Arc`.
24+
#[repr(C)]
25+
#[derive(Debug)]
26+
pub struct Arc<T, A: Allocator = Global> {
27+
ptr: NonNull<ArcInner<T>>,
28+
alloc: A,
29+
phantom: PhantomData<ArcInner<T>>,
30+
}
31+
32+
// repr(C) prevents field reordering that could affect raw-pointer helpers.
33+
#[repr(C)]
34+
struct ArcInner<T> {
35+
refcount: CachePadded<AtomicUsize>,
36+
data: CachePadded<T>,
37+
}
38+
39+
impl<T> ArcInner<T> {
40+
fn from_ptr<'a>(ptr: *const T) -> &'a Self {
41+
let data = ptr.cast::<u8>();
42+
let data_offset = Arc::<T>::data_offset();
43+
let bytes_ptr = unsafe { data.sub(data_offset) };
44+
let arc_ptr = bytes_ptr as *mut ArcInner<T>;
45+
unsafe { &*arc_ptr }
46+
}
47+
48+
fn try_clone(&self) -> Result<(), ArcOverflow> {
49+
if self.refcount.fetch_add(1, Ordering::Relaxed) > MAX_REFCOUNT {
50+
self.refcount.fetch_sub(1, Ordering::Relaxed);
51+
return Err(ArcOverflow);
52+
}
53+
Ok(())
54+
}
55+
}
56+
57+
impl<T> Arc<T> {
58+
pub fn try_new(data: T) -> Result<Arc<T, Global>, AllocError> {
59+
Self::try_new_in(data, Global)
60+
}
61+
62+
/// Tries to increment the reference count using only a pointer to the
63+
/// inner `T`. This does not create an `Arc<T>` instance.
64+
///
65+
/// # Safety
66+
/// - `ptr` must be a valid pointer to the `T` inside an `Arc<T>` allocation produced by this
67+
/// module. Passing any other pointer is undefined behavior.
68+
/// - There must be at least one existing reference alive when called.
69+
pub unsafe fn try_increment_count(ptr: *const T) -> Result<(), ArcOverflow> {
70+
let inner = ArcInner::from_ptr(ptr);
71+
inner.try_clone()
72+
}
73+
}
74+
75+
impl<T, A: Allocator> Arc<T, A> {
76+
/// Constructs a new `Arc<T, A>` in the provided allocator, returning an
77+
/// error if allocation fails.
78+
pub fn try_new_in(data: T, alloc: A) -> Result<Arc<T, A>, AllocError> {
79+
let inner = ArcInner {
80+
refcount: CachePadded::new(AtomicUsize::new(1)),
81+
data: CachePadded::new(data),
82+
};
83+
let boxed = Box::try_new_in(inner, alloc)?;
84+
let (ptr, alloc) = Box::into_non_null(boxed);
85+
Ok(Arc {
86+
ptr,
87+
alloc,
88+
phantom: PhantomData,
89+
})
90+
}
91+
92+
/// Returns the inner value, if the `Arc` has exactly one reference.
93+
///
94+
/// Otherwise, an [`Err`] is returned with the same `Arc` that was passed
95+
/// in.
96+
///
97+
/// It is strongly recommended to use [`Arc::into_inner`] instead if you
98+
/// don't keep the `Arc` in the [`Err`] case.
99+
pub fn try_unwrap(this: Self) -> Result<T, Self> {
100+
// Attempt to take unique ownership by transitioning strong: 1 -> 0
101+
let inner_ref = unsafe { this.ptr.as_ref() };
102+
if inner_ref
103+
.refcount
104+
.compare_exchange(1, 0, Ordering::Acquire, Ordering::Relaxed)
105+
.is_ok()
106+
{
107+
// We have unique ownership; move out T and deallocate without dropping T.
108+
let this = ManuallyDrop::new(this);
109+
let ptr = this.ptr.as_ptr();
110+
let alloc: A = unsafe { ptr::read(&this.alloc) };
111+
// Reconstruct a Box to ArcInner and convert into inner to avoid double-drop of T
112+
let boxed: Box<ArcInner<T>, A> = unsafe { Box::from_raw_in(ptr, alloc) };
113+
let ArcInner { refcount: _, data } = Box::into_inner(boxed);
114+
// We moved out `data` above, so do not use `data` here; free already done via
115+
// into_inner
116+
Ok(CachePadded::into_inner(data))
117+
} else {
118+
Err(this)
119+
}
120+
}
121+
122+
pub fn into_inner(this: Self) -> Option<T> {
123+
// Prevent running Drop; we will manage the refcount and allocation manually.
124+
let this = ManuallyDrop::new(this);
125+
let inner = unsafe { this.ptr.as_ref() };
126+
if inner.refcount.fetch_sub(1, Ordering::Release) != 1 {
127+
return None;
128+
}
129+
fence(Ordering::Acquire);
130+
131+
// We are the last strong reference. Move out T and free the allocation
132+
// without dropping T.
133+
let ptr = this.ptr.as_ptr();
134+
let alloc: A = unsafe { ptr::read(&this.alloc) };
135+
let boxed: Box<ArcInner<T>, A> = unsafe { Box::from_raw_in(ptr, alloc) };
136+
let ArcInner { refcount: _, data } = Box::into_inner(boxed);
137+
Some(CachePadded::into_inner(data))
138+
}
139+
140+
/// Returns a raw non-null pointer to the inner value. The pointer is valid
141+
/// as long as there is at least one strong reference alive.
142+
#[inline]
143+
pub fn as_ptr(&self) -> NonNull<T> {
144+
let ptr = NonNull::as_ptr(self.ptr);
145+
// SAFETY: `ptr` points to a valid `ArcInner<T>` allocation. Taking the
146+
// address of the `data` field preserves provenance unlike going
147+
// through Deref.
148+
let data = unsafe { ptr::addr_of_mut!((*ptr).data) };
149+
// SAFETY: data field address is derived from a valid NonNull.
150+
unsafe { NonNull::new_unchecked(data as *mut T) }
151+
}
152+
153+
/// Converts the Arc into a non-null pointer to the inner value, without
154+
/// decreasing the reference count.
155+
///
156+
/// The caller must later call `Arc::from_raw` with the same pointer exactly
157+
/// once to avoid leaking the allocation.
158+
#[inline]
159+
#[must_use = "losing the pointer will leak memory"]
160+
pub fn into_raw(this: Self) -> NonNull<T> {
161+
let this = ManuallyDrop::new(this);
162+
// Reuse as_ptr logic without dropping `this`.
163+
Arc::as_ptr(&this)
164+
}
165+
}
166+
167+
// SAFETY: `Arc<T, A>` is Send and Sync iff `T` is Send and Sync.
168+
unsafe impl<T: Send + Sync, A: Allocator> Send for Arc<T, A> {}
169+
unsafe impl<T: Send + Sync, A: Allocator> Sync for Arc<T, A> {}
170+
171+
impl<T, A: Allocator> Arc<T, A> {
172+
#[inline]
173+
fn inner(&self) -> &ArcInner<T> {
174+
// SAFETY: `ptr` is a valid, live allocation managed by this Arc
175+
unsafe { self.ptr.as_ref() }
176+
}
177+
}
178+
179+
/// Error returned when the reference count would overflow.
180+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
181+
pub struct ArcOverflow;
182+
183+
impl fmt::Display for ArcOverflow {
184+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185+
f.write_str("arc: reference count overflow")
186+
}
187+
}
188+
189+
impl core::error::Error for ArcOverflow {}
190+
191+
/// A limit on the amount of references that may be made to an `Arc`.
192+
const MAX_REFCOUNT: usize = isize::MAX as usize;
193+
194+
impl<T, A: Allocator + Clone> Arc<T, A> {
195+
/// Fallible clone that increments the strong reference count.
196+
///
197+
/// Returns an error if the reference count would exceed `isize::MAX`.
198+
pub fn try_clone(&self) -> Result<Self, ArcOverflow> {
199+
let inner = self.inner();
200+
inner.try_clone()?;
201+
Ok(Arc {
202+
ptr: self.ptr,
203+
alloc: self.alloc.clone(),
204+
phantom: PhantomData,
205+
})
206+
}
207+
}
208+
209+
impl<T, A: Allocator> Drop for Arc<T, A> {
210+
fn drop(&mut self) {
211+
let inner = self.inner();
212+
if inner.refcount.fetch_sub(1, Ordering::Release) == 1 {
213+
// Synchronize with other threads that might have modified the data
214+
// before dropping the last strong reference.
215+
// Raymond Chen wrote a little blog article about it:
216+
// https://devblogs.microsoft.com/oldnewthing/20251015-00/?p=111686
217+
fence(Ordering::Acquire);
218+
// SAFETY: this was the last strong reference; reclaim allocation
219+
let ptr = self.ptr.as_ptr();
220+
// Move out allocator for deallocation, but prevent double-drop of `alloc`
221+
let alloc: A = unsafe { ptr::read(&self.alloc) };
222+
unsafe { drop(Box::<ArcInner<T>, A>::from_raw_in(ptr, alloc)) };
223+
}
224+
}
225+
}
226+
227+
impl<T, A: Allocator> Deref for Arc<T, A> {
228+
type Target = T;
229+
230+
fn deref(&self) -> &Self::Target {
231+
// SAFETY: The allocation outlives `self` while any strong refs exist.
232+
unsafe { &self.ptr.as_ref().data }
233+
}
234+
}
235+
236+
impl<T, A: Allocator> Arc<T, A> {
237+
#[inline]
238+
fn data_offset() -> usize {
239+
// Layout of ArcInner<T> is repr(C): [CachePadded<AtomicUsize>, CachePadded<T>]
240+
let header = Layout::new::<CachePadded<AtomicUsize>>();
241+
match header.extend(Layout::new::<CachePadded<T>>()) {
242+
Ok((_layout, offset)) => offset,
243+
Err(_) => {
244+
// Fallback: compute padding manually to avoid unwrap. This should
245+
// not fail in practice for valid types.
246+
let align = Layout::new::<CachePadded<T>>().align();
247+
let size = header.size();
248+
let padding = (align - (size % align)) % align;
249+
size + padding
250+
}
251+
}
252+
}
253+
254+
/// Recreates an `Arc<T, A>` from a raw pointer produced by `Arc::into_raw`.
255+
///
256+
/// # Safety
257+
/// - `ptr` must have been returned by a previous call to `Arc::<T, A>::into_raw`.
258+
/// - if `ptr` has been cast, it needs to be to a compatible repr.
259+
/// - It must not be used to create multiple owning `Arc`s without corresponding `into_raw`
260+
/// calls; otherwise the refcount will be decremented multiple times.
261+
#[inline]
262+
pub unsafe fn from_raw_in(ptr: NonNull<T>, alloc: A) -> Self {
263+
let data = ptr.as_ptr() as *const u8;
264+
let arc_ptr_u8 = data.sub(Self::data_offset());
265+
let arc_ptr = arc_ptr_u8 as *mut ArcInner<T>;
266+
Arc {
267+
ptr: NonNull::new_unchecked(arc_ptr),
268+
alloc,
269+
phantom: PhantomData,
270+
}
271+
}
272+
}
273+
274+
impl<T> Arc<T> {
275+
/// Recreates an `Arc<T>` in the `Global` allocator from a raw pointer
276+
/// produced by `Arc::into_raw`.
277+
///
278+
/// # Safety
279+
/// See [`Arc::from_raw_in`] for requirements.
280+
#[inline]
281+
pub unsafe fn from_raw(ptr: NonNull<T>) -> Self {
282+
Arc::from_raw_in(ptr, Global)
283+
}
284+
}
285+
286+
#[cfg(test)]
287+
mod tests {
288+
use super::*;
289+
290+
#[test]
291+
fn try_new_and_unwrap_unique() {
292+
let arc = Arc::try_new(123u32).unwrap();
293+
let v = Arc::try_unwrap(arc).ok().unwrap();
294+
assert_eq!(v, 123);
295+
}
296+
297+
#[test]
298+
fn try_unwrap_fails_when_shared() {
299+
let arc = Arc::try_new(5usize).unwrap();
300+
let arc2 = arc.try_clone().unwrap();
301+
assert!(Arc::try_unwrap(arc).is_err());
302+
assert_eq!(*arc2, 5);
303+
}
304+
305+
#[test]
306+
fn deref_access() {
307+
let arc = Arc::try_new("abc").unwrap();
308+
assert_eq!(arc.len(), 3);
309+
assert_eq!(*arc, "abc");
310+
}
311+
}

libdd-profiling/src/profiles/collections/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ pub enum SetError {
88
InvalidArgument,
99
#[error("set error: out of memory")]
1010
OutOfMemory,
11+
#[error("set error: reference count overflow")]
12+
ReferenceCountOverflow,
1113
}
1214

1315
impl From<libdd_alloc::AllocError> for SetError {

libdd-profiling/src/profiles/collections/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4+
mod arc;
45
mod error;
6+
mod parallel;
57
mod set;
68
mod slice_set;
79
mod string_set;
810
mod thin_str;
911

1012
pub type SetHasher = core::hash::BuildHasherDefault<rustc_hash::FxHasher>;
1113

14+
pub use arc::*;
1215
pub use error::*;
16+
pub use parallel::*;
1317
pub use set::*;
1418
pub use slice_set::*;
1519
pub use string_set::*;
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
mod set;
5+
mod sharded;
6+
mod slice_set;
7+
mod string_set;
8+
9+
pub use set::*;
10+
pub use sharded::*;
11+
pub use slice_set::*;
12+
pub use string_set::*;

0 commit comments

Comments
 (0)