-
Notifications
You must be signed in to change notification settings - Fork 2
/
classical.go
162 lines (141 loc) · 3.99 KB
/
classical.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package lfring
import (
"sync/atomic"
)
type classical[T any] struct {
head uint64
tail uint64
capacity uint64
mask uint64
element []*T
}
func newClassical[T any](capacity uint64) RingBuffer[T] {
return &classical[T]{
head: uint64(0),
tail: uint64(0),
capacity: capacity,
mask: capacity - 1,
element: make([]*T, capacity),
}
}
func (r *classical[T]) Offer(value T) (success bool) {
oldTail := atomic.LoadUint64(&r.tail)
oldHead := atomic.LoadUint64(&r.head)
if r.isFull(oldTail, oldHead) {
return false
}
newTail := oldTail + 1
tailNode := r.element[newTail&r.mask]
// not published yet
if tailNode != nil {
return false
}
if !atomic.CompareAndSwapUint64(&r.tail, oldTail, newTail) {
return false
}
r.element[newTail&r.mask] = &value
return true
}
func (r *classical[T]) SingleProducerOffer(valueSupplier func() (v T, finish bool)) {
oldTail := r.tail
oldHead := atomic.LoadUint64(&r.head)
if r.isFull(oldTail, oldHead) {
return
}
newTail := oldTail + 1
for ; newTail-oldHead < r.capacity; newTail++ {
tailNode := r.element[newTail&r.mask]
// not published yet
if tailNode != nil {
break
}
v, finish := valueSupplier()
if finish {
break
}
r.element[newTail&r.mask] = &v
}
atomic.StoreUint64(&r.tail, newTail-1)
}
func (r *classical[T]) Poll() (value T, success bool) {
oldTail := atomic.LoadUint64(&r.tail)
oldHead := atomic.LoadUint64(&r.head)
if r.isEmpty(oldTail, oldHead) {
return
}
newHead := oldHead + 1
headNode := r.element[newHead&r.mask]
// not published yet
if headNode == nil {
return
}
if !atomic.CompareAndSwapUint64(&r.head, oldHead, newHead) {
return
}
r.element[newHead&r.mask] = nil
return *headNode, true
}
func (r *classical[T]) SingleConsumerPoll(valueConsumer func(T)) {
oldTail := atomic.LoadUint64(&r.tail)
oldHead := r.head
if r.isEmpty(oldTail, oldHead) {
return
}
currHead := oldHead + 1
for ; currHead <= oldTail; currHead++ {
currNode := r.element[currHead&r.mask]
// not published yet
if currNode == nil {
break
}
valueConsumer(*currNode)
r.element[currHead&r.mask] = nil
}
atomic.StoreUint64(&r.head, currHead-1)
}
func (r *classical[T]) SingleConsumerPollVec(ret []T) (validCnt uint64) {
oldTail := atomic.LoadUint64(&r.tail)
oldHead := r.head
if r.isEmpty(oldTail, oldHead) {
return
}
currHead := oldHead + 1
for ; currHead <= oldTail; currHead++ {
currNode := r.element[currHead&r.mask]
// not published yet
if currNode == nil {
break
}
ret[currHead-oldHead-1] = *currNode
r.element[currHead&r.mask] = nil
}
atomic.StoreUint64(&r.head, currHead-1)
return currHead - oldHead - 1
}
// isFull check whether buffer is full by compare (tail - head).
// Because of none-sync read of tail and head, the tail maybe smaller than head(which is
// never happened in the view of buffer):
//
// Say if the thread read tail=4 at time point one (in this time head=3), then wait to
// get scheduled, after a long wait, at time point two (in this time tail=8), the thread
// read head=7. So at the view in the thread, tail=4 and head=7.
//
// Hence, once tail < head means the tail is far behind the real (which means CAS-tail will
// definitely fail), so we just return full to the Offer caller let it try again.
func (r *classical[T]) isFull(tail uint64, head uint64) bool {
return tail-head >= r.capacity-1
}
// isEmpty check whether buffer is empty by compare (tail - head).
// Same as isFull, the tail also may be smaller than head at thread view, which can be lead
// to wrong result:
//
// consider consumer c1 get tail=3 head=5, but actually the latest tail=5
// (means buffer empty), if we continue Poll, the dirty value can be fetched, maybe nil -
// which is harmless, maybe old value that have not been set to nil yet (by last consumer)
// - which is fatal.
//
// To keep the correctness of ring buffer, we need to return true when tail < head and
// tail == head.
func (r *classical[T]) isEmpty(tail uint64, head uint64) bool {
return (tail < head) || (tail-head == 0)
}