-
Notifications
You must be signed in to change notification settings - Fork 211
/
Copy pathdefault_reader.go
66 lines (57 loc) · 1.38 KB
/
default_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package disruptor
import (
"io"
"sync/atomic"
)
type DefaultReader struct {
state int64
current *Cursor // this reader has processed up to this sequence
written *Cursor // the ring buffer has been written up to this sequence
upstream Barrier // all of the readers have advanced up to this sequence
waiter WaitStrategy
consumer Consumer
}
func NewReader(current, written *Cursor, upstream Barrier, waiter WaitStrategy, consumer Consumer) *DefaultReader {
return &DefaultReader{
state: stateRunning,
current: current,
written: written,
upstream: upstream,
waiter: waiter,
consumer: consumer,
}
}
func (this *DefaultReader) Read() {
var gateCount, idleCount, lower, upper int64
var current = this.current.Load()
for {
lower = current + 1
upper = this.upstream.Load()
if lower <= upper {
this.consumer.Consume(lower, upper)
this.current.Store(upper)
current = upper
} else if upper = this.written.Load(); lower <= upper {
gateCount++
idleCount = 0
this.waiter.Gate(gateCount)
} else if atomic.LoadInt64(&this.state) == stateRunning {
idleCount++
gateCount = 0
this.waiter.Idle(idleCount)
} else {
break
}
}
if closer, ok := this.consumer.(io.Closer); ok {
_ = closer.Close()
}
}
func (this *DefaultReader) Close() error {
atomic.StoreInt64(&this.state, stateClosed)
return nil
}
const (
stateRunning = iota
stateClosed
)