-
Notifications
You must be signed in to change notification settings - Fork 2
/
buffer.go
149 lines (129 loc) · 2.74 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package muxado
import (
"bytes"
"errors"
"io"
"os"
"sync"
"time"
)
var (
bufferFull = errors.New("buffer is full")
bufferClosed = errors.New("buffer closed previously")
)
type buffer interface {
Read([]byte) (int, error)
ReadFrom(io.Reader) (int64, error)
SetError(error)
SetDeadline(time.Time)
}
type inboundBuffer struct {
cond sync.Cond
mu sync.Mutex
bytes.Buffer
err error
maxSize int
deadline time.Time
timer *time.Timer
}
func (b *inboundBuffer) Init(maxSize int) {
b.cond.L = &b.mu
b.maxSize = maxSize
}
func (b *inboundBuffer) ReadFrom(rd io.Reader) (n int64, err error) {
var n64 int64
b.mu.Lock()
if b.err != nil {
if _, err = io.ReadAll(rd); err == nil {
err = bufferClosed
}
goto DONE
}
n64, err = b.Buffer.ReadFrom(rd)
n += n64
if b.Buffer.Len() > b.maxSize {
err = bufferFull
b.err = bufferFull
}
b.cond.Broadcast()
DONE:
b.mu.Unlock()
return n, err
}
// Notify readers that the deadline has arrived.
func (b *inboundBuffer) notifyDeadline() {
// It's important that the mutex is locked for this. It ensures that an
// in-flight timer can't broadcast before a reader gets to its condvar.Wait().
b.mu.Lock()
b.cond.Broadcast()
b.mu.Unlock()
}
// Start or reset the timer.
// Must be called when the mutex is locked.
func (b *inboundBuffer) startTimerLocked(timeout time.Duration) {
if b.timer == nil {
b.timer = time.AfterFunc(timeout, b.notifyDeadline)
} else {
b.timer.Reset(timeout)
}
}
// Stops a timer, if one is set.
// Must be called while the mutex is locked.
func (b *inboundBuffer) stopTimerLocked() {
if b.timer != nil {
b.timer.Stop()
}
}
func (b *inboundBuffer) Read(p []byte) (n int, err error) {
b.mu.Lock()
for {
// If the deadline is set, we need to take it into account
if !b.deadline.IsZero() {
// If the deadline is in the past, bail out.
// SetDeadline will ensure that we get woken back up if it expires.
if time.Until(b.deadline) < 0 {
n = 0
err = os.ErrDeadlineExceeded
break
}
}
if b.Len() != 0 {
n, err = b.Buffer.Read(p)
break
}
if b.err != nil {
err = b.err
break
}
b.cond.Wait()
}
b.mu.Unlock()
return
}
func (b *inboundBuffer) SetError(err error) {
b.mu.Lock()
b.err = err
b.cond.Broadcast()
b.mu.Unlock()
}
func (b *inboundBuffer) SetDeadline(t time.Time) {
b.mu.Lock()
// Set the deadline and notify any readers that they need to take heed.
// They'll figure out all of the timer management for us.
b.deadline = t
if timeout := time.Until(t); timeout > 0 {
b.startTimerLocked(timeout)
} else {
b.stopTimerLocked()
}
b.cond.Broadcast()
b.mu.Unlock()
}
func (b *inboundBuffer) Close() error {
b.mu.Lock()
b.stopTimerLocked()
b.err = io.EOF
b.cond.Broadcast()
b.mu.Unlock()
return nil
}