Skip to content

Commit b519403

Browse files
committed
Auto merge of #8 - japaric:atomic, r=<try>
use atomics where available cc #5 cc @pftbest
2 parents f7ca3b5 + 158d19b commit b519403

File tree

7 files changed

+176
-20
lines changed

7 files changed

+176
-20
lines changed

blacklist.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# false positives from thread::spawn (?)
2+
race:<alloc::arc::Arc<T>>::drop_slow
3+
race:__GI___call_tls_dtors
4+
race:alloc::heap::{{impl}}::dealloc
5+
race:core::ptr::drop_in_place<core::option::Option<core::result::Result<(), alloc::boxed::Box<Any>>>>
6+
race:core::ptr::drop_in_place<core::result::Result<(), alloc::boxed::Box<Any>>>

ci/install.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ main() {
1111
rustup component list | grep 'rust-src.*installed' || \
1212
rustup component add rust-src
1313
;;
14+
x86_64-unknown-linux-gnu)
15+
;;
1416
*)
17+
# unhandled case
18+
exit 1
1519
;;
1620
esac
1721
}

ci/script.sh

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,23 @@ main() {
55
thumb*m-none-eabi)
66
xargo check --target $TARGET
77
;;
8-
*)
8+
x86_64-unknown-linux-gnu)
99
cargo check --target $TARGET
10+
11+
cargo test --target $TARGET
12+
cargo test --target $TARGET --release
13+
14+
export TSAN_OPTIONS="suppressions=$(pwd)/blacklist.txt"
15+
export RUSTFLAGS="-Z sanitizer=thread"
16+
17+
cargo test --test tsan --target $TARGET
18+
cargo test --test tsan --target $TARGET --release
19+
;;
20+
*)
21+
# unhandled case
22+
exit 1
1023
;;
1124
esac
1225
}
26+
27+
main

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,9 @@
144144
//! is_send::<Vec<NotSend, [NotSend; 4]>>();
145145
//! ```
146146
147+
#![cfg_attr(target_has_atomic = "ptr", feature(const_atomic_usize_new))]
147148
#![deny(missing_docs)]
149+
#![feature(cfg_target_has_atomic)]
148150
#![feature(const_fn)]
149151
#![feature(shared)]
150152
#![feature(unsize)]

src/ring_buffer/mod.rs

Lines changed: 70 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
33
use core::marker::{PhantomData, Unsize};
44
use core::ptr;
5+
#[cfg(target_has_atomic = "ptr")]
6+
use core::sync::atomic::{AtomicUsize, Ordering};
57

68
use untagged_option::UntaggedOption;
79

@@ -18,11 +20,16 @@ where
1820
A: Unsize<[T]>,
1921
{
2022
_marker: PhantomData<[T]>,
21-
buffer: UntaggedOption<A>,
23+
2224
// this is from where we dequeue items
23-
head: usize,
25+
#[cfg(target_has_atomic = "ptr")] head: AtomicUsize,
26+
#[cfg(not(target_has_atomic = "ptr"))] head: usize,
27+
2428
// this is where we enqueue new items
25-
tail: usize,
29+
#[cfg(target_has_atomic = "ptr")] tail: AtomicUsize,
30+
#[cfg(not(target_has_atomic = "ptr"))] tail: usize,
31+
32+
buffer: UntaggedOption<A>,
2633
}
2734

2835
impl<T, A> RingBuffer<T, A>
@@ -35,7 +42,13 @@ where
3542
RingBuffer {
3643
_marker: PhantomData,
3744
buffer: UntaggedOption::none(),
45+
#[cfg(target_has_atomic = "ptr")]
46+
head: AtomicUsize::new(0),
47+
#[cfg(not(target_has_atomic = "ptr"))]
3848
head: 0,
49+
#[cfg(target_has_atomic = "ptr")]
50+
tail: AtomicUsize::new(0),
51+
#[cfg(not(target_has_atomic = "ptr"))]
3952
tail: 0,
4053
}
4154
}
@@ -49,11 +62,22 @@ where
4962
/// Returns the item in the front of the queue, or `None` if the queue is empty
5063
pub fn dequeue(&mut self) -> Option<T> {
5164
let n = self.capacity() + 1;
65+
66+
#[cfg(target_has_atomic = "ptr")]
67+
let head = self.head.get_mut();
68+
#[cfg(not(target_has_atomic = "ptr"))]
69+
let head = &mut self.head;
70+
71+
#[cfg(target_has_atomic = "ptr")]
72+
let tail = self.tail.get_mut();
73+
#[cfg(not(target_has_atomic = "ptr"))]
74+
let tail = &mut self.tail;
75+
5276
let buffer: &[T] = unsafe { self.buffer.as_ref() };
5377

54-
if self.head != self.tail {
55-
let item = unsafe { ptr::read(buffer.get_unchecked(self.head)) };
56-
self.head = (self.head + 1) % n;
78+
if *head != *tail {
79+
let item = unsafe { ptr::read(buffer.get_unchecked(*head)) };
80+
*head = (*head + 1) % n;
5781
Some(item)
5882
} else {
5983
None
@@ -65,14 +89,25 @@ where
6589
/// Returns `BufferFullError` if the queue is full
6690
pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> {
6791
let n = self.capacity() + 1;
92+
93+
#[cfg(target_has_atomic = "ptr")]
94+
let head = self.head.get_mut();
95+
#[cfg(not(target_has_atomic = "ptr"))]
96+
let head = &mut self.head;
97+
98+
#[cfg(target_has_atomic = "ptr")]
99+
let tail = self.tail.get_mut();
100+
#[cfg(not(target_has_atomic = "ptr"))]
101+
let tail = &mut self.tail;
102+
68103
let buffer: &mut [T] = unsafe { self.buffer.as_mut() };
69104

70-
let next_tail = (self.tail + 1) % n;
71-
if next_tail != self.head {
105+
let next_tail = (*tail + 1) % n;
106+
if next_tail != *head {
72107
// NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We
73108
// use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory
74-
unsafe { ptr::write(buffer.get_unchecked_mut(self.tail), item) }
75-
self.tail = next_tail;
109+
unsafe { ptr::write(buffer.get_unchecked_mut(*tail), item) }
110+
*tail = next_tail;
76111
Ok(())
77112
} else {
78113
Err(BufferFullError)
@@ -81,10 +116,20 @@ where
81116

82117
/// Returns the number of elements in the queue
83118
pub fn len(&self) -> usize {
84-
if self.head > self.tail {
85-
self.head - self.tail
119+
#[cfg(target_has_atomic = "ptr")]
120+
let head = self.head.load(Ordering::Relaxed);
121+
#[cfg(not(target_has_atomic = "ptr"))]
122+
let head = self.head;
123+
124+
#[cfg(target_has_atomic = "ptr")]
125+
let tail = self.tail.load(Ordering::Relaxed);
126+
#[cfg(not(target_has_atomic = "ptr"))]
127+
let tail = self.tail;
128+
129+
if head > tail {
130+
head - tail
86131
} else {
87-
self.tail - self.head
132+
tail - head
88133
}
89134
}
90135

@@ -176,9 +221,14 @@ where
176221

177222
fn next(&mut self) -> Option<&'a T> {
178223
if self.index < self.len {
224+
#[cfg(not(target_has_atomic = "ptr"))]
225+
let head = self.rb.head;
226+
#[cfg(target_has_atomic = "ptr")]
227+
let head = self.rb.head.load(Ordering::Relaxed);
228+
179229
let buffer: &[T] = unsafe { self.rb.buffer.as_ref() };
180230
let ptr = buffer.as_ptr();
181-
let i = (self.rb.head + self.index) % (self.rb.capacity() + 1);
231+
let i = (head + self.index) % (self.rb.capacity() + 1);
182232
self.index += 1;
183233
Some(unsafe { &*ptr.offset(i as isize) })
184234
} else {
@@ -196,10 +246,15 @@ where
196246

197247
fn next(&mut self) -> Option<&'a mut T> {
198248
if self.index < self.len {
249+
#[cfg(not(target_has_atomic = "ptr"))]
250+
let head = self.rb.head;
251+
#[cfg(target_has_atomic = "ptr")]
252+
let head = self.rb.head.load(Ordering::Relaxed);
253+
199254
let capacity = self.rb.capacity() + 1;
200255
let buffer: &mut [T] = unsafe { self.rb.buffer.as_mut() };
201256
let ptr: *mut T = buffer.as_mut_ptr();
202-
let i = (self.rb.head + self.index) % capacity;
257+
let i = (head + self.index) % capacity;
203258
self.index += 1;
204259
Some(unsafe { &mut *ptr.offset(i as isize) })
205260
} else {

src/ring_buffer/spsc.rs

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use core::ptr::{self, Shared};
22
use core::marker::Unsize;
3+
#[cfg(target_has_atomic = "ptr")]
4+
use core::sync::atomic::Ordering;
35

46
use BufferFullError;
57
use ring_buffer::RingBuffer;
@@ -10,8 +12,11 @@ where
1012
{
1113
/// Splits a statically allocated ring buffer into producer and consumer end points
1214
///
13-
/// *Warning* the current implementation only supports single core processors. It's also fine to
14-
/// use both end points on the same core of a multi-core processor.
15+
/// **Warning** the current single producer single consumer implementation only supports
16+
/// multi-core systems where `cfg(target_has_atomic = "ptr")` holds for all the cores. For
17+
/// example, a dual core system where one core is Cortex-M0 core and the other is Cortex-M3 core
18+
/// is not supported because Cortex-M0 (`thumbv6m-none-eabi`) doesn't satisfy
19+
/// `cfg(target_has_atomic = "ptr")`. All single core systems are supported.
1520
pub fn split(&'static mut self) -> (Producer<T, A>, Consumer<T, A>) {
1621
(
1722
Producer {
@@ -39,8 +44,30 @@ where
3944
A: Unsize<[T]>,
4045
{
4146
/// Returns the item in the front of the queue, or `None` if the queue is empty
47+
#[cfg(target_has_atomic = "ptr")]
48+
pub fn dequeue(&mut self) -> Option<T> {
49+
let rb = unsafe { self.rb.as_ref() };
50+
51+
let tail = rb.tail.load(Ordering::Relaxed);
52+
let head = rb.head.load(Ordering::Acquire);
53+
54+
let n = rb.capacity() + 1;
55+
let buffer: &[T] = unsafe { rb.buffer.as_ref() };
56+
57+
if head != tail {
58+
let item = unsafe { ptr::read(buffer.get_unchecked(head)) };
59+
rb.head.store((head + 1) % n, Ordering::Release);
60+
Some(item)
61+
} else {
62+
None
63+
}
64+
}
65+
66+
/// Returns the item in the front of the queue, or `None` if the queue is empty
67+
#[cfg(not(target_has_atomic = "ptr"))]
4268
pub fn dequeue(&mut self) -> Option<T> {
4369
let rb = unsafe { self.rb.as_mut() };
70+
4471
let n = rb.capacity() + 1;
4572
let buffer: &[T] = unsafe { rb.buffer.as_ref() };
4673

@@ -80,17 +107,44 @@ where
80107
/// Adds an `item` to the end of the queue
81108
///
82109
/// Returns `BufferFullError` if the queue is full
110+
#[cfg(target_has_atomic = "ptr")]
111+
pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> {
112+
let rb = unsafe { self.rb.as_mut() };
113+
114+
let head = rb.head.load(Ordering::Relaxed);
115+
let tail = rb.tail.load(Ordering::Acquire);
116+
117+
let n = rb.capacity() + 1;
118+
let next_tail = (tail + 1) % n;
119+
120+
let buffer: &mut [T] = unsafe { rb.buffer.as_mut() };
121+
122+
if next_tail != head {
123+
// NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We
124+
// use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory
125+
unsafe { ptr::write(buffer.get_unchecked_mut(tail), item) }
126+
rb.tail.store(next_tail, Ordering::Release);
127+
Ok(())
128+
} else {
129+
Err(BufferFullError)
130+
}
131+
}
132+
133+
/// Adds an `item` to the end of the queue
134+
///
135+
/// Returns `BufferFullError` if the queue is full
136+
#[cfg(not(target_has_atomic = "ptr"))]
83137
pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> {
84138
let rb = unsafe { self.rb.as_mut() };
139+
85140
let n = rb.capacity() + 1;
86141
let buffer: &mut [T] = unsafe { rb.buffer.as_mut() };
87142

88143
let next_tail = (rb.tail + 1) % n;
89144
// NOTE(volatile) the value of `head` can change at any time in the execution context of the
90145
// producer so we inform this to the compiler using a volatile load
91146
if next_tail != unsafe { ptr::read_volatile(&rb.head) } {
92-
// NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We
93-
// use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory
147+
// NOTE(ptr::write) see the other `enqueue` implementation above for details
94148
unsafe { ptr::write(buffer.get_unchecked_mut(rb.tail), item) }
95149
rb.tail = next_tail;
96150
Ok(())

tests/tsan.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
extern crate heapless;
2+
3+
use std::thread;
4+
5+
use heapless::RingBuffer;
6+
7+
#[test]
8+
fn tsan() {
9+
static mut RB: RingBuffer<i32, [i32; 4]> = RingBuffer::new();
10+
11+
unsafe { RB.split() }.0.enqueue(0).unwrap();
12+
13+
thread::spawn(|| {
14+
unsafe { RB.split() }.0.enqueue(1).unwrap();
15+
});
16+
17+
thread::spawn(|| {
18+
unsafe { RB.split() }.1.dequeue().unwrap();
19+
});
20+
}

0 commit comments

Comments
 (0)