@@ -22,51 +22,71 @@ use core::cell::UnsafeCell;
22
22
23
23
use sync:: atomic:: { AtomicPtr , AtomicUsize , Ordering } ;
24
24
25
+ use super :: cache_aligned:: CacheAligned ;
26
+
25
27
// Node within the linked list queue of messages to send
26
28
struct Node < T > {
27
29
// FIXME: this could be an uninitialized T if we're careful enough, and
28
30
// that would reduce memory usage (and be a bit faster).
29
31
// is it worth it?
30
32
value : Option < T > , // nullable for re-use of nodes
33
+ cached : bool , // This node goes into the node cache
31
34
next : AtomicPtr < Node < T > > , // next node in the queue
32
35
}
33
36
34
37
/// The single-producer single-consumer queue. This structure is not cloneable,
35
38
/// but it can be safely shared in an Arc if it is guaranteed that there
36
39
/// is only one popper and one pusher touching the queue at any one point in
37
40
/// time.
38
- pub struct Queue < T > {
41
+ pub struct Queue < T , ProducerAddition = ( ) , ConsumerAddition = ( ) > {
39
42
// consumer fields
43
+ consumer : CacheAligned < Consumer < T , ConsumerAddition > > ,
44
+
45
+ // producer fields
46
+ producer : CacheAligned < Producer < T , ProducerAddition > > ,
47
+ }
48
+
49
+ struct Consumer < T , Addition > {
40
50
tail : UnsafeCell < * mut Node < T > > , // where to pop from
41
51
tail_prev : AtomicPtr < Node < T > > , // where to pop from
52
+ cache_bound : usize , // maximum cache size
53
+ cached_nodes : AtomicUsize , // number of nodes marked as cachable
54
+ addition : Addition ,
55
+ }
42
56
43
- // producer fields
57
+ struct Producer < T , Addition > {
44
58
head : UnsafeCell < * mut Node < T > > , // where to push to
45
59
first : UnsafeCell < * mut Node < T > > , // where to get new nodes from
46
60
tail_copy : UnsafeCell < * mut Node < T > > , // between first/tail
47
-
48
- // Cache maintenance fields. Additions and subtractions are stored
49
- // separately in order to allow them to use nonatomic addition/subtraction.
50
- cache_bound : usize ,
51
- cache_additions : AtomicUsize ,
52
- cache_subtractions : AtomicUsize ,
61
+ addition : Addition ,
53
62
}
54
63
55
- unsafe impl < T : Send > Send for Queue < T > { }
64
+ unsafe impl < T : Send , P : Send + Sync , C : Send + Sync > Send for Queue < T , P , C > { }
56
65
57
- unsafe impl < T : Send > Sync for Queue < T > { }
66
+ unsafe impl < T : Send , P : Send + Sync , C : Send + Sync > Sync for Queue < T , P , C > { }
58
67
59
68
impl < T > Node < T > {
60
69
fn new ( ) -> * mut Node < T > {
61
70
Box :: into_raw ( box Node {
62
71
value : None ,
72
+ cached : false ,
63
73
next : AtomicPtr :: new ( ptr:: null_mut :: < Node < T > > ( ) ) ,
64
74
} )
65
75
}
66
76
}
67
77
68
- impl < T > Queue < T > {
69
- /// Creates a new queue.
78
+ impl < T , ProducerAddition , ConsumerAddition > Queue < T , ProducerAddition , ConsumerAddition > {
79
+
80
+ /// Creates a new queue. With given additional elements in the producer and
81
+ /// consumer portions of the queue.
82
+ ///
83
+ /// Due to the performance implications of cache-contention,
84
+ /// we wish to keep fields used mainly by the producer on a separate cache
85
+ /// line than those used by the consumer.
86
+ /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
87
+ /// allocate one for small fields, so we allow users to insert additional
88
+ /// fields into the cache lines already allocated by this for the producer
89
+ /// and consumer.
70
90
///
71
91
/// This is unsafe as the type system doesn't enforce a single
72
92
/// consumer-producer relationship. It also allows the consumer to `pop`
@@ -83,19 +103,28 @@ impl<T> Queue<T> {
83
103
/// cache (if desired). If the value is 0, then the cache has
84
104
/// no bound. Otherwise, the cache will never grow larger than
85
105
/// `bound` (although the queue itself could be much larger.
86
- pub unsafe fn new ( bound : usize ) -> Queue < T > {
106
+ pub unsafe fn with_additions (
107
+ bound : usize ,
108
+ producer_addition : ProducerAddition ,
109
+ consumer_addition : ConsumerAddition ,
110
+ ) -> Self {
87
111
let n1 = Node :: new ( ) ;
88
112
let n2 = Node :: new ( ) ;
89
113
( * n1) . next . store ( n2, Ordering :: Relaxed ) ;
90
114
Queue {
91
- tail : UnsafeCell :: new ( n2) ,
92
- tail_prev : AtomicPtr :: new ( n1) ,
93
- head : UnsafeCell :: new ( n2) ,
94
- first : UnsafeCell :: new ( n1) ,
95
- tail_copy : UnsafeCell :: new ( n1) ,
96
- cache_bound : bound,
97
- cache_additions : AtomicUsize :: new ( 0 ) ,
98
- cache_subtractions : AtomicUsize :: new ( 0 ) ,
115
+ consumer : CacheAligned :: new ( Consumer {
116
+ tail : UnsafeCell :: new ( n2) ,
117
+ tail_prev : AtomicPtr :: new ( n1) ,
118
+ cache_bound : bound,
119
+ cached_nodes : AtomicUsize :: new ( 0 ) ,
120
+ addition : consumer_addition
121
+ } ) ,
122
+ producer : CacheAligned :: new ( Producer {
123
+ head : UnsafeCell :: new ( n2) ,
124
+ first : UnsafeCell :: new ( n1) ,
125
+ tail_copy : UnsafeCell :: new ( n1) ,
126
+ addition : producer_addition
127
+ } ) ,
99
128
}
100
129
}
101
130
@@ -109,35 +138,25 @@ impl<T> Queue<T> {
109
138
assert ! ( ( * n) . value. is_none( ) ) ;
110
139
( * n) . value = Some ( t) ;
111
140
( * n) . next . store ( ptr:: null_mut ( ) , Ordering :: Relaxed ) ;
112
- ( * * self . head . get ( ) ) . next . store ( n, Ordering :: Release ) ;
113
- * self . head . get ( ) = n;
141
+ ( * * self . producer . head . get ( ) ) . next . store ( n, Ordering :: Release ) ;
142
+ * ( & self . producer . head ) . get ( ) = n;
114
143
}
115
144
}
116
145
117
146
unsafe fn alloc ( & self ) -> * mut Node < T > {
118
147
// First try to see if we can consume the 'first' node for our uses.
119
- // We try to avoid as many atomic instructions as possible here, so
120
- // the addition to cache_subtractions is not atomic (plus we're the
121
- // only one subtracting from the cache).
122
- if * self . first . get ( ) != * self . tail_copy . get ( ) {
123
- if self . cache_bound > 0 {
124
- let b = self . cache_subtractions . load ( Ordering :: Relaxed ) ;
125
- self . cache_subtractions . store ( b + 1 , Ordering :: Relaxed ) ;
126
- }
127
- let ret = * self . first . get ( ) ;
128
- * self . first . get ( ) = ( * ret) . next . load ( Ordering :: Relaxed ) ;
148
+ if * self . producer . first . get ( ) != * self . producer . tail_copy . get ( ) {
149
+ let ret = * self . producer . first . get ( ) ;
150
+ * self . producer . 0 . first . get ( ) = ( * ret) . next . load ( Ordering :: Relaxed ) ;
129
151
return ret;
130
152
}
131
153
// If the above fails, then update our copy of the tail and try
132
154
// again.
133
- * self . tail_copy . get ( ) = self . tail_prev . load ( Ordering :: Acquire ) ;
134
- if * self . first . get ( ) != * self . tail_copy . get ( ) {
135
- if self . cache_bound > 0 {
136
- let b = self . cache_subtractions . load ( Ordering :: Relaxed ) ;
137
- self . cache_subtractions . store ( b + 1 , Ordering :: Relaxed ) ;
138
- }
139
- let ret = * self . first . get ( ) ;
140
- * self . first . get ( ) = ( * ret) . next . load ( Ordering :: Relaxed ) ;
155
+ * self . producer . 0 . tail_copy . get ( ) =
156
+ self . consumer . tail_prev . load ( Ordering :: Acquire ) ;
157
+ if * self . producer . first . get ( ) != * self . producer . tail_copy . get ( ) {
158
+ let ret = * self . producer . first . get ( ) ;
159
+ * self . producer . 0 . first . get ( ) = ( * ret) . next . load ( Ordering :: Relaxed ) ;
141
160
return ret;
142
161
}
143
162
// If all of that fails, then we have to allocate a new node
@@ -153,27 +172,27 @@ impl<T> Queue<T> {
153
172
// sentinel from where we should start popping from. Hence, look at
154
173
// tail's next field and see if we can use it. If we do a pop, then
155
174
// the current tail node is a candidate for going into the cache.
156
- let tail = * self . tail . get ( ) ;
175
+ let tail = * self . consumer . tail . get ( ) ;
157
176
let next = ( * tail) . next . load ( Ordering :: Acquire ) ;
158
177
if next. is_null ( ) { return None }
159
178
assert ! ( ( * next) . value. is_some( ) ) ;
160
179
let ret = ( * next) . value . take ( ) ;
161
180
162
- * self . tail . get ( ) = next;
163
- if self . cache_bound == 0 {
164
- self . tail_prev . store ( tail, Ordering :: Release ) ;
181
+ * self . consumer . 0 . tail . get ( ) = next;
182
+ if self . consumer . cache_bound == 0 {
183
+ self . consumer . tail_prev . store ( tail, Ordering :: Release ) ;
165
184
} else {
166
- // FIXME: this is dubious with overflow.
167
- let additions = self . cache_additions . load ( Ordering :: Relaxed ) ;
168
- let subtractions = self . cache_subtractions . load ( Ordering :: Relaxed ) ;
169
- let size = additions - subtractions ;
170
-
171
- if size < self . cache_bound {
172
- self . tail_prev . store ( tail, Ordering :: Release ) ;
173
- self . cache_additions . store ( additions + 1 , Ordering :: Relaxed ) ;
185
+ let cached_nodes = self . consumer . cached_nodes . load ( Ordering :: Relaxed ) ;
186
+ if cached_nodes < self . consumer . cache_bound && ! ( * tail ) . cached {
187
+ self . consumer . cached_nodes . store ( cached_nodes , Ordering :: Relaxed ) ;
188
+ ( * tail ) . cached = true ;
189
+ }
190
+
191
+ if ( * tail) . cached {
192
+ self . consumer . tail_prev . store ( tail , Ordering :: Release ) ;
174
193
} else {
175
- ( * self . tail_prev . load ( Ordering :: Relaxed ) )
176
- . next . store ( next, Ordering :: Relaxed ) ;
194
+ ( * self . consumer . tail_prev . load ( Ordering :: Relaxed ) )
195
+ . next . store ( next, Ordering :: Relaxed ) ;
177
196
// We have successfully erased all references to 'tail', so
178
197
// now we can safely drop it.
179
198
let _: Box < Node < T > > = Box :: from_raw ( tail) ;
@@ -194,17 +213,25 @@ impl<T> Queue<T> {
194
213
// This is essentially the same as above with all the popping bits
195
214
// stripped out.
196
215
unsafe {
197
- let tail = * self . tail . get ( ) ;
216
+ let tail = * self . consumer . tail . get ( ) ;
198
217
let next = ( * tail) . next . load ( Ordering :: Acquire ) ;
199
218
if next. is_null ( ) { None } else { ( * next) . value . as_mut ( ) }
200
219
}
201
220
}
221
+
222
+ pub fn producer_addition ( & self ) -> & ProducerAddition {
223
+ & self . producer . addition
224
+ }
225
+
226
+ pub fn consumer_addition ( & self ) -> & ConsumerAddition {
227
+ & self . consumer . addition
228
+ }
202
229
}
203
230
204
- impl < T > Drop for Queue < T > {
231
+ impl < T , ProducerAddition , ConsumerAddition > Drop for Queue < T , ProducerAddition , ConsumerAddition > {
205
232
fn drop ( & mut self ) {
206
233
unsafe {
207
- let mut cur = * self . first . get ( ) ;
234
+ let mut cur = * self . producer . first . get ( ) ;
208
235
while !cur. is_null ( ) {
209
236
let next = ( * cur) . next . load ( Ordering :: Relaxed ) ;
210
237
let _n: Box < Node < T > > = Box :: from_raw ( cur) ;
@@ -224,7 +251,7 @@ mod tests {
224
251
#[ test]
225
252
fn smoke ( ) {
226
253
unsafe {
227
- let queue = Queue :: new ( 0 ) ;
254
+ let queue = Queue :: with_additions ( 0 , ( ) , ( ) ) ;
228
255
queue. push ( 1 ) ;
229
256
queue. push ( 2 ) ;
230
257
assert_eq ! ( queue. pop( ) , Some ( 1 ) ) ;
@@ -241,7 +268,7 @@ mod tests {
241
268
#[ test]
242
269
fn peek ( ) {
243
270
unsafe {
244
- let queue = Queue :: new ( 0 ) ;
271
+ let queue = Queue :: with_additions ( 0 , ( ) , ( ) ) ;
245
272
queue. push ( vec ! [ 1 ] ) ;
246
273
247
274
// Ensure the borrowchecker works
@@ -264,7 +291,7 @@ mod tests {
264
291
#[ test]
265
292
fn drop_full ( ) {
266
293
unsafe {
267
- let q: Queue < Box < _ > > = Queue :: new ( 0 ) ;
294
+ let q: Queue < Box < _ > > = Queue :: with_additions ( 0 , ( ) , ( ) ) ;
268
295
q. push ( box 1 ) ;
269
296
q. push ( box 2 ) ;
270
297
}
@@ -273,7 +300,7 @@ mod tests {
273
300
#[ test]
274
301
fn smoke_bound ( ) {
275
302
unsafe {
276
- let q = Queue :: new ( 0 ) ;
303
+ let q = Queue :: with_additions ( 0 , ( ) , ( ) ) ;
277
304
q. push ( 1 ) ;
278
305
q. push ( 2 ) ;
279
306
assert_eq ! ( q. pop( ) , Some ( 1 ) ) ;
@@ -295,7 +322,7 @@ mod tests {
295
322
}
296
323
297
324
unsafe fn stress_bound ( bound : usize ) {
298
- let q = Arc :: new ( Queue :: new ( bound) ) ;
325
+ let q = Arc :: new ( Queue :: with_additions ( bound, ( ) , ( ) ) ) ;
299
326
300
327
let ( tx, rx) = channel ( ) ;
301
328
let q2 = q. clone ( ) ;
0 commit comments