-
Notifications
You must be signed in to change notification settings - Fork 20
/
peer.go
167 lines (136 loc) · 3.19 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package bcache
import (
"github.com/weaveworks/mesh"
)
type peer struct {
cc *cache
name mesh.PeerName
send mesh.Gossip
actionCh chan func()
quitCh chan struct{}
logger Logger
}
func newPeer(name mesh.PeerName, maxKeys int, logger Logger) (*peer, error) {
cc, err := newCache(maxKeys)
if err != nil {
return nil, err
}
p := &peer{
cc: cc,
name: name,
send: nil, // must be registered
actionCh: make(chan func()),
quitCh: make(chan struct{}),
logger: logger,
}
go p.loop()
return p, nil
}
// register the result of a mesh.Router.NewGossip.
func (p *peer) register(send mesh.Gossip) {
p.actionCh <- func() {
p.send = send
}
}
// Gossip implements mesh.Gossiper.Gossip
func (p *peer) Gossip() mesh.GossipData {
return p.cc.Messages()
}
// OnGossip merges received data into state and returns "everything new
// I've just learnt", or nil if nothing in the received data was new.
//
// It implements mesh.Gossiper.OnGossip
func (p *peer) OnGossip(buf []byte) (delta mesh.GossipData, err error) {
msg, err := newMessageFromBuf(buf)
if err != nil {
return
}
var deltaMsg *message
delta = p.cc.mergeNew(msg)
if delta != nil {
deltaMsg = delta.(*message)
}
p.logger.Debugf("[%d]OnGossip %v => delta %v", p.name, msg, deltaMsg)
return
}
// OnGossipBroadcast merges received data into state and returns a
// representation of the received data (typically a delta) for further
// propagation.
//
// It implements mesh.Gossiper.OnGossipBroadcast
func (p *peer) OnGossipBroadcast(src mesh.PeerName, update []byte) (received mesh.GossipData, err error) {
if src == p.name { // message from ourself, is it possible?
return
}
msg, err := newMessageFromBuf(update)
if err != nil {
return
}
var recvMsg *message
received = p.cc.mergeDelta(msg)
if received != nil {
recvMsg = received.(*message)
}
p.logger.Debugf("[%d]OnGossipBroadcast %v => delta %v", p.name, msg, recvMsg)
return
}
func (p *peer) OnGossipUnicast(src mesh.PeerName, update []byte) error {
msg, err := newMessageFromBuf(update)
if err != nil {
return err
}
p.cc.mergeComplete(msg)
return nil
}
func (p *peer) Set(key, val string, expiredTimestamp int64) {
c := make(chan struct{})
p.actionCh <- func() {
defer close(c)
// set our cache
p.cc.Set(key, val, expiredTimestamp, 0)
// construct & send the message
m := newMessage(p.name, 1)
m.add(key, val, expiredTimestamp, 0)
p.broadcast(m)
}
<-c // wait for it to be finished
}
func (p *peer) Delete(key string, deleteTimestamp int64) bool {
var (
c = make(chan struct{})
exist bool
)
p.actionCh <- func() {
defer close(c)
// delete from our cache
val, expired, exist := p.cc.Delete(key, deleteTimestamp)
if !exist {
return
}
// construct & send the message
m := newMessage(p.name, 1)
m.add(key, val, expired, deleteTimestamp)
p.broadcast(m)
}
<-c // wait for it to be finished
return exist
}
func (p *peer) Get(key string) (string, bool) {
return p.cc.Get(key)
}
func (p *peer) loop() {
for {
select {
case f := <-p.actionCh:
f()
case <-p.quitCh:
return
}
}
}
func (p *peer) broadcast(msg *message) {
if p.send == nil {
return
}
p.send.GossipBroadcast(msg)
}