-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpubsub.go
110 lines (90 loc) · 2.59 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*
PubSub package provides simple mechanism to implement publisher subscriber relation.
type Timer struct {
pubsub.Publisher
}
timer := new(Timer)
go func() {
for {
time.Sleep(time.Second)
timer.Publish(time.Now())
}
}()
reader, _ := timer.SubReader()
for {
fmt.Println(reader.Read())
}
Memory considerations: memory consumption increases if subscribers do not consume messages as fast as published by Publisher.
There's no need to unsubscribe explicitelly, once SubReader reference is lost GC takes care of it. The same applies to subscription channel.
You might want to hide Publish method in composition scenarios:
type Timer struct {
p pubsub.Publisher
}
In that case you need to provide access to SubReader and SubChannel methods:
func (t *Timer) SubReader() (pubsub.Reader, interface{}) {
return t.p.SubReader()
}
*/
package pubsub
import (
"sync"
)
// Subscription Reader is used to read messages published by Publisher
type SubReader interface {
// Read operation blocks and waits for message from Publisher
Read() interface{}
}
// Publisher is used to publish messages. Can be directly created.
type Publisher struct {
m sync.Mutex
lastMsg *msg
}
type subscriber struct{ in chan *msg }
type msg struct {
val interface{}
next chan *msg
}
func newMsg(val interface{}) *msg { return &msg{val: val, next: make(chan *msg, 1)} }
// Publish publishes a message to all existing subscribers
func (p *Publisher) Publish(val interface{}) {
p.m.Lock()
defer p.m.Unlock()
msg := newMsg(val)
if p.lastMsg != nil {
p.lastMsg.next <- msg
}
p.lastMsg = msg
}
// SubReader returns a new reader for reading published messages and a last published message.
func (p *Publisher) SubReader() (reader SubReader, lastMsg interface{}) {
p.m.Lock()
defer p.m.Unlock()
if p.lastMsg == nil {
p.lastMsg = newMsg(nil)
}
return &subscriber{p.lastMsg.next}, p.lastMsg.val
}
// SubChannel returns a new channel for reading published messages and a last published message.
// If published messages equals (==) finalMsg then channel is closed afer putting message into channel.
func (p *Publisher) SubChannel(finalMsg interface{}) (msgChan <-chan interface{}, lastMsg interface{}) {
listener, cur := p.SubReader()
outch := make(chan interface{})
go listen(listener, outch, finalMsg)
return outch, cur
}
func listen(subscriber SubReader, ch chan interface{}, finalMsg interface{}) {
defer close(ch)
for {
state := subscriber.Read()
ch <- state
if state == finalMsg {
return
}
}
}
func (s *subscriber) Read() interface{} {
msg := <-s.in
s.in <- msg
s.in = msg.next
return msg.val
}