forked from libp2p/go-libp2p-pubsub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gossipsub_connmgr_test.go
168 lines (141 loc) · 4.72 KB
/
gossipsub_connmgr_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package pubsub
import (
"context"
"testing"
"time"
"github.com/benbjohnson/clock"
"github.com/libp2p/go-libp2p-core/host"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/peer"
bhost "github.com/libp2p/go-libp2p/p2p/host/blank"
)
func TestGossipsubConnTagMessageDeliveries(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
oldGossipSubD := GossipSubD
oldGossipSubDlo := GossipSubDlo
oldGossipSubDHi := GossipSubDhi
oldGossipSubConnTagDecayInterval := GossipSubConnTagDecayInterval
oldGossipSubConnTagMessageDeliveryCap := GossipSubConnTagMessageDeliveryCap
oldSilencePeriod := connmgr.SilencePeriod
// set the gossipsub D parameters low, so that we have some peers outside the mesh
GossipSubDlo = 3
GossipSubD = 3
GossipSubDhi = 3
// also set the tag decay interval so we don't have to wait forever for tests
GossipSubConnTagDecayInterval = time.Second
// set the cap for deliveries above GossipSubConnTagValueMeshPeer, so the sybils
// will be forced out even if they end up in someone's mesh
GossipSubConnTagMessageDeliveryCap = 50
connmgr.SilencePeriod = time.Millisecond
// reset globals after test
defer func() {
GossipSubD = oldGossipSubD
GossipSubDlo = oldGossipSubDlo
GossipSubDhi = oldGossipSubDHi
GossipSubConnTagDecayInterval = oldGossipSubConnTagDecayInterval
GossipSubConnTagMessageDeliveryCap = oldGossipSubConnTagMessageDeliveryCap
connmgr.SilencePeriod = oldSilencePeriod
}()
decayClock := clock.NewMock()
decayCfg := connmgr.DecayerCfg{
Resolution: time.Second,
Clock: decayClock,
}
nHonest := 5
nSquatter := 10
connLimit := 10
connmgrs := make([]*connmgr.BasicConnMgr, nHonest)
honestHosts := make([]host.Host, nHonest)
honestPeers := make(map[peer.ID]struct{})
for i := 0; i < nHonest; i++ {
connmgrs[i] = connmgr.NewConnManager(nHonest, connLimit, 0,
connmgr.DecayerConfig(&decayCfg))
netw := swarmt.GenSwarm(t)
defer netw.Close()
h := bhost.NewBlankHost(netw, bhost.WithConnectionManager(connmgrs[i]))
honestHosts[i] = h
honestPeers[h.ID()] = struct{}{}
}
// use flood publishing, so non-mesh peers will still be delivering messages
// to everyone
psubs := getGossipsubs(ctx, honestHosts,
WithFloodPublish(true))
// sybil squatters to be connected later
sybilHosts := getNetHosts(t, ctx, nSquatter)
for _, h := range sybilHosts {
squatter := &sybilSquatter{h: h}
h.SetStreamHandler(GossipSubID_v10, squatter.handleStream)
}
// connect the honest hosts
connectAll(t, honestHosts)
for _, h := range honestHosts {
if len(h.Network().Conns()) != nHonest-1 {
t.Errorf("expected to have conns to all honest peers, have %d", len(h.Network().Conns()))
}
}
// subscribe everyone to the topic
topic := "test"
for _, ps := range psubs {
_, err := ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
}
// sleep to allow meshes to form
time.Sleep(2 * time.Second)
// have all the hosts publish enough messages to ensure that they get some delivery credit
nMessages := GossipSubConnTagMessageDeliveryCap * 2
for _, ps := range psubs {
for i := 0; i < nMessages; i++ {
ps.Publish(topic, []byte("hello"))
}
}
// advance the fake time for the tag decay
decayClock.Add(time.Second)
// verify that they've given each other delivery connection tags
tag := "pubsub-deliveries:test"
for _, h := range honestHosts {
for _, h2 := range honestHosts {
if h.ID() == h2.ID() {
continue
}
val := getTagValue(h.ConnManager(), h2.ID(), tag)
if val == 0 {
t.Errorf("Expected non-zero delivery tag value for peer %s", h2.ID())
}
}
}
// now connect the sybils to put pressure on the real hosts' connection managers
allHosts := append(honestHosts, sybilHosts...)
connectAll(t, allHosts)
// verify that we have a bunch of connections
for _, h := range honestHosts {
if len(h.Network().Conns()) != nHonest+nSquatter-1 {
t.Errorf("expected to have conns to all peers, have %d", len(h.Network().Conns()))
}
}
// force the connection managers to trim, so we don't need to muck about with timing as much
for _, cm := range connmgrs {
cm.TrimOpenConns(ctx)
}
// we should still have conns to all the honest peers, but not the sybils
for _, h := range honestHosts {
nHonestConns := 0
nDishonestConns := 0
for _, conn := range h.Network().Conns() {
if _, ok := honestPeers[conn.RemotePeer()]; !ok {
nDishonestConns++
} else {
nHonestConns++
}
}
if nDishonestConns > connLimit-nHonest {
t.Errorf("expected most dishonest conns to be pruned, have %d", nDishonestConns)
}
if nHonestConns != nHonest-1 {
t.Errorf("expected all honest conns to be preserved, have %d", nHonestConns)
}
}
}