-
Notifications
You must be signed in to change notification settings - Fork 189
/
Copy pathgossiplww.go
76 lines (64 loc) · 1.95 KB
/
gossiplww.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package pubsub
import (
"context"
host "github.com/libp2p/go-libp2p-host"
protocol "github.com/libp2p/go-libp2p-protocol"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
)
type LWWMessageCache struct {
IsNewerThan func(currentMsg, incomingMsg *pb.Message) bool
ComputeID func(msg *pb.Message) string
topicMsgIDMap map[string]string
IDMsgMap map[string]*pb.Message
}
func NewLWWMessageCache(IsNewerThan func(currentMsg, incomingMsg *pb.Message) bool, ComputeID func(msg *pb.Message) string) *LWWMessageCache {
return &LWWMessageCache{
topicMsgIDMap: make(map[string]string),
IDMsgMap: make(map[string]*pb.Message),
IsNewerThan: IsNewerThan,
ComputeID: ComputeID,
}
}
func (mc *LWWMessageCache) Put(msg *pb.Message) {
mid := mc.ComputeID(msg)
_, ok := mc.IDMsgMap[mid]
if !ok {
mc.IDMsgMap[mid] = msg
}
for _, topic := range msg.TopicIDs {
lastMsgID, ok := mc.topicMsgIDMap[topic]
if !ok {
mc.topicMsgIDMap[topic] = mid
continue
}
lastMsg, ok := mc.IDMsgMap[lastMsgID]
if !ok {
continue
}
if mc.IsNewerThan(msg, lastMsg) {
mc.topicMsgIDMap[topic] = mid
}
}
}
func (mc *LWWMessageCache) Get(mid string) (*pb.Message, bool) {
m, ok := mc.IDMsgMap[mid]
return m, ok
}
func (mc *LWWMessageCache) GetGossipIDs(topic string) []string {
mid, ok := mc.topicMsgIDMap[topic]
if ok {
return []string{mid}
}
return []string{}
}
func (mc *LWWMessageCache) Shift() {}
var _ MessageCacheReader = (*LWWMessageCache)(nil)
// NewGossipBaseSub returns a new PubSub object using GossipSubRouter as the router.
func NewGossipSyncLWW(ctx context.Context, h host.Host, mcache *LWWMessageCache, protocolID protocol.ID, opts ...Option) (*PubSub, error) {
rt := NewGossipConfigurableRouter(&ClassicGossipSubConfiguration{
mcache: mcache,
supportedProtocols: []protocol.ID{protocolID},
protocol: protocolID,
})
return NewPubSub(ctx, h, rt, append([]Option{WithRouterConfiguration(rt)}, opts...)...)
}