-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathspmc.go
51 lines (45 loc) · 797 Bytes
/
spmc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package onering
import (
"runtime"
"sync/atomic"
"unsafe"
)
type SPMC struct {
multi
}
func (r *SPMC) Get(i interface{}) bool {
var (
rp = r.next(&r.rp)
data, seq = r.frame(rp)
)
for pread := -rp; atomic.LoadInt64(seq) != pread; runtime.Gosched() {
if atomic.LoadInt32(&r.done) > 0 && atomic.LoadInt64(&r.wp) <= rp {
return false
}
}
inject(i, *data)
atomic.StoreInt64(seq, rp+r.size)
return true
}
func (r *SPMC) Consume(i interface{}) {
var (
fn = extractfn(i)
it iter
ptr unsafe.Pointer
)
for !it.stop && r.Get(&ptr) {
fn(&it, ptr)
}
}
func (r *SPMC) Put(i interface{}) {
var (
wp = r.wp
data, seq = r.frame(wp)
)
for atomic.LoadInt64(seq) < 0 {
runtime.Gosched()
}
*data = extractptr(i)
r.wp++
atomic.StoreInt64(seq, -wp)
}