-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathresponse_queue.go
73 lines (59 loc) · 1.4 KB
/
response_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package fiber
import "sync"
type ResponseQueue interface {
Iter() <-chan Response
}
type responseQueue struct {
lock sync.RWMutex
items []Response
buffer int
subscriptions []chan Response
done chan struct{}
}
func (r *responseQueue) append(resp Response) {
r.lock.Lock()
defer r.lock.Unlock()
r.items = append(r.items, resp)
for _, subscription := range r.subscriptions {
subscription <- resp
}
}
func (r *responseQueue) Iter() <-chan Response {
out := make(chan Response, r.buffer)
go func() {
r.lock.Lock()
r.subscriptions = append(r.subscriptions, out)
for _, resp := range r.items {
out <- resp
}
r.lock.Unlock()
<-r.done
close(out)
}()
return out
}
// NewResponseQueue takes an input channel and creates a Queue with all responseQueue from it
func NewResponseQueue(in <-chan Response, bufferSize int) ResponseQueue {
queue := &responseQueue{
buffer: bufferSize,
done: make(chan struct{}),
}
go func(q *responseQueue) {
defer close(q.done)
for resp := range in {
q.append(resp)
}
}(queue)
return queue
}
// NewResponseQueueFromResponses takes list of responses and constructs
// an instance of ResponseQueue from them
func NewResponseQueueFromResponses(responses ...Response) ResponseQueue {
queue := &responseQueue{
items: responses,
buffer: len(responses),
done: make(chan struct{}),
}
close(queue.done)
return queue
}