Skip to content

Commit

Permalink
identify: Fix flaky tests (#1555)
Browse files Browse the repository at this point in the history
* Fix flaky timing dependent tests

* Update go-libp2p-peerstore dependency

* Register notifiee synchronously

* Only a single connection

* Remove WaitForDisconnectNotification hack since notifs are now synchronous

* Add debug logging to identify tests

* Close chan once
  • Loading branch information
MarcoPolo authored Jun 2, 2022
1 parent 3517eae commit e6379f5
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 30 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/libp2p/go-libp2p-asn-util v0.2.0
github.com/libp2p/go-libp2p-circuit v0.6.0
github.com/libp2p/go-libp2p-core v0.16.1
github.com/libp2p/go-libp2p-peerstore v0.6.0
github.com/libp2p/go-libp2p-peerstore v0.7.0
github.com/libp2p/go-libp2p-resource-manager v0.3.0
github.com/libp2p/go-libp2p-testing v0.9.2
github.com/libp2p/go-mplex v0.7.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,9 @@ github.com/libp2p/go-libp2p-core v0.14.0/go.mod h1:tLasfcVdTXnixsLB0QYaT1syJOhsb
github.com/libp2p/go-libp2p-core v0.16.1 h1:bWoiEBqVkpJ13hbv/f69tHODp86t6mvc4fBN4DkK73M=
github.com/libp2p/go-libp2p-core v0.16.1/go.mod h1:O3i/7y+LqUb0N+qhzXjBjjpchgptWAVMG1Voegk7b4c=
github.com/libp2p/go-libp2p-mplex v0.5.0/go.mod h1:eLImPJLkj3iG5t5lq68w3Vm5NAQ5BcKwrrb2VmOYb3M=
github.com/libp2p/go-libp2p-peerstore v0.6.0 h1:HJminhQSGISBIRb93N6WK3t6Fa8OOTnHd/VBjL4mY5A=
github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc=
github.com/libp2p/go-libp2p-peerstore v0.7.0 h1:2iIUwok3vtmnWJTZeTeLgnBO6GbkXcwSRwgZHEKrQZs=
github.com/libp2p/go-libp2p-peerstore v0.7.0/go.mod h1:cdUWTHro83vpg6unCpGUr8qJoX3e93Vy8o97u5ppIM0=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-quic-transport v0.16.0 h1:aVg9/jr+R2esov5sH7wkXrmYmqJiUjtLMLYX3L9KYdY=
Expand Down
21 changes: 20 additions & 1 deletion p2p/net/swarm/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,29 @@ type config struct {
connectionGater connmgr.ConnectionGater
rcmgr network.ResourceManager
sk crypto.PrivKey
clock
}

type clock interface {
Now() time.Time
}

type realclock struct{}

func (rc realclock) Now() time.Time {
return time.Now()
}

// Option is an option that can be passed when constructing a test swarm.
type Option func(*testing.T, *config)

// WithClock sets the clock to use for this swarm
func WithClock(clock clock) Option {
return func(_ *testing.T, c *config) {
c.clock = clock
}
}

// OptDisableReuseport disables reuseport in this test swarm.
var OptDisableReuseport Option = func(_ *testing.T, c *config) {
c.disableReuseport = true
Expand Down Expand Up @@ -105,6 +123,7 @@ func GenUpgrader(t *testing.T, n *swarm.Swarm, opts ...tptu.Option) transport.Up
// GenSwarm generates a new test swarm.
func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm {
var cfg config
cfg.clock = realclock{}
for _, o := range opts {
o(t, &cfg)
}
Expand All @@ -124,7 +143,7 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm {
p.Addr = tnet.ZeroLocalTCPAddress
}

ps, err := pstoremem.NewPeerstore()
ps, err := pstoremem.NewPeerstore(pstoremem.WithClock(cfg.clock))
require.NoError(t, err)
ps.AddPubKey(p.ID, p.PubKey)
ps.AddPrivKey(p.ID, p.PrivKey)
Expand Down
1 change: 1 addition & 0 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ func (ids *idService) IdentifyWait(c network.Conn) <-chan struct{} {
go func() {
defer close(wait)
if err := ids.identifyConn(c); err != nil {
log.Warnf("failed to identify %s: %s", c.RemotePeer(), err)
ids.emitters.evtPeerIdentificationFailed.Emit(event.EvtPeerIdentificationFailed{Peer: c.RemotePeer(), Reason: err})
return
}
Expand Down
84 changes: 57 additions & 27 deletions p2p/protocol/identify/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p"
blhost "github.com/libp2p/go-libp2p/p2p/host/blank"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"
Expand All @@ -28,17 +29,22 @@ import (

"github.com/libp2p/go-eventbus"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
"github.com/libp2p/go-libp2p-testing/race"

mockClock "github.com/benbjohnson/clock"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-msgio/protoio"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func init() {
logging.SetLogLevel("net/identify", "debug")
}

func testKnowsAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiaddr) {
t.Helper()
assert.ElementsMatchf(t, expected, h.Peerstore().Addrs(p), fmt.Sprintf("%s did not have addr for %s", h.ID(), p))
require.True(t, assert.ElementsMatchf(t, expected, h.Peerstore().Addrs(p), fmt.Sprintf("%s did not have addr for %s", h.ID(), p)))
}

func testHasCertifiedAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiaddr) {
Expand All @@ -62,7 +68,7 @@ func testHasCertifiedAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.M
if !ok {
t.Error("unexpected record type")
}
assert.ElementsMatchf(t, expected, rec.Addrs, fmt.Sprintf("%s did not have certified addr for %s", h.ID(), p))
require.True(t, assert.ElementsMatchf(t, expected, rec.Addrs, fmt.Sprintf("%s did not have certified addr for %s", h.ID(), p)))
}

func testHasProtocolVersions(t *testing.T, h host.Host, p peer.ID) {
Expand Down Expand Up @@ -147,15 +153,15 @@ func emitAddrChangeEvt(t *testing.T, h host.Host) {
// id service is done.
func TestIDService(t *testing.T) {
// This test is highly timing dependent, waiting on timeouts/expiration.
if race.WithRace() {
t.Skip("skipping highly timing dependent test when race detector is enabled")
}
oldTTL := peerstore.RecentlyConnectedAddrTTL
peerstore.RecentlyConnectedAddrTTL = 500 * time.Millisecond
t.Cleanup(func() { peerstore.RecentlyConnectedAddrTTL = oldTTL })

h1 := blhost.NewBlankHost(swarmt.GenSwarm(t))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t))
clk := mockClock.NewMock()
swarm1 := swarmt.GenSwarm(t, swarmt.WithClock(clk))
swarm2 := swarmt.GenSwarm(t, swarmt.WithClock(clk))
h1 := blhost.NewBlankHost(swarm1)
h2 := blhost.NewBlankHost(swarm2)

h1p := h1.ID()
h2p := h2.ID()
Expand Down Expand Up @@ -212,6 +218,8 @@ func TestIDService(t *testing.T) {
testHasPublicKey(t, h2, h1p, h1.Peerstore().PubKey(h1p)) // h1 should have h2's public key

// Need both sides to actually notice that the connection has been closed.
sentDisconnect1 := waitForDisconnectNotification(swarm1)
sentDisconnect2 := waitForDisconnectNotification(swarm2)
h1.Network().ClosePeer(h2p)
h2.Network().ClosePeer(h1p)
if len(h2.Network().ConnsToPeer(h1.ID())) != 0 || len(h1.Network().ConnsToPeer(h2.ID())) != 0 {
Expand All @@ -225,10 +233,13 @@ func TestIDService(t *testing.T) {
testHasCertifiedAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p))
testHasCertifiedAddrs(t, h2, h1p, h1.Peerstore().Addrs(h1p))

<-sentDisconnect1
<-sentDisconnect2

// the addrs had their TTLs reduced on disconnect, and
// will be forgotten soon after
t.Log("testing addrs after TTL expiration")
time.Sleep(time.Second)
clk.Add(time.Second)
testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{})
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{})
testHasCertifiedAddrs(t, h1, h2p, []ma.Multiaddr{})
Expand Down Expand Up @@ -794,13 +805,11 @@ func TestLargeIdentifyMessage(t *testing.T) {
peerstore.RecentlyConnectedAddrTTL = 500 * time.Millisecond
t.Cleanup(func() { peerstore.RecentlyConnectedAddrTTL = oldTTL })

sk1, _, err := coretest.RandTestKeyPair(ic.RSA, 4096)
require.NoError(t, err)
sk2, _, err := coretest.RandTestKeyPair(ic.RSA, 4096)
require.NoError(t, err)

h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptPeerPrivateKey(sk1)))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptPeerPrivateKey(sk2)))
clk := mockClock.NewMock()
swarm1 := swarmt.GenSwarm(t, swarmt.WithClock(clk))
swarm2 := swarmt.GenSwarm(t, swarmt.WithClock(clk))
h1 := blhost.NewBlankHost(swarm1)
h2 := blhost.NewBlankHost(swarm2)

// add protocol strings to make the message larger
// about 2K of protocol strings
Expand Down Expand Up @@ -833,10 +842,12 @@ func TestLargeIdentifyMessage(t *testing.T) {
forgetMe, _ := ma.NewMultiaddr("/ip4/1.2.3.4/tcp/1234")
h2.Peerstore().AddAddr(h1p, forgetMe, peerstore.RecentlyConnectedAddrTTL)

require.NoError(t, h1.Connect(context.Background(), h2.Peerstore().PeerInfo(h2p)))
h2pi := h2.Peerstore().PeerInfo(h2p)
h2pi.Addrs = h2pi.Addrs[:1]
require.NoError(t, h1.Connect(context.Background(), h2pi))

h1t2c := h1.Network().ConnsToPeer(h2p)
require.NotEmpty(t, h1t2c, "should have a conn here")
require.Equal(t, 1, len(h1t2c), "should have a conn here")

ids1.IdentifyConn(h1t2c[0])

Expand All @@ -851,7 +862,7 @@ func TestLargeIdentifyMessage(t *testing.T) {
// now, this wait we do have to do. it's the wait for the Listening side
// to be done identifying the connection.
c := h2.Network().ConnsToPeer(h1.ID())
if len(c) < 1 {
if len(c) != 1 {
t.Fatal("should have connection by now at least.")
}
ids2.IdentifyConn(c[0])
Expand All @@ -864,6 +875,8 @@ func TestLargeIdentifyMessage(t *testing.T) {
testHasPublicKey(t, h2, h1p, h1.Peerstore().PubKey(h1p)) // h1 should have h2's public key

// Need both sides to actually notice that the connection has been closed.
sentDisconnect1 := waitForDisconnectNotification(swarm1)
sentDisconnect2 := waitForDisconnectNotification(swarm2)
h1.Network().ClosePeer(h2p)
h2.Network().ClosePeer(h1p)
if len(h2.Network().ConnsToPeer(h1.ID())) != 0 || len(h1.Network().ConnsToPeer(h2.ID())) != 0 {
Expand All @@ -877,10 +890,13 @@ func TestLargeIdentifyMessage(t *testing.T) {
testHasCertifiedAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p))
testHasCertifiedAddrs(t, h2, h1p, h1.Peerstore().Addrs(h1p))

<-sentDisconnect1
<-sentDisconnect2

// the addrs had their TTLs reduced on disconnect, and
// will be forgotten soon after
t.Log("testing addrs after TTL expiration")
time.Sleep(time.Second)
clk.Add(time.Second)
testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{})
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{})
testHasCertifiedAddrs(t, h1, h2p, []ma.Multiaddr{})
Expand All @@ -898,13 +914,8 @@ func TestLargePushMessage(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sk1, _, err := coretest.RandTestKeyPair(ic.RSA, 4096)
require.NoError(t, err)
sk2, _, err := coretest.RandTestKeyPair(ic.RSA, 4096)
require.NoError(t, err)

h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptPeerPrivateKey(sk1)))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptPeerPrivateKey(sk2)))
h1 := blhost.NewBlankHost(swarmt.GenSwarm(t))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t))

// add protocol strings to make the message larger
// about 2K of protocol strings
Expand Down Expand Up @@ -1109,3 +1120,22 @@ func waitForAddrInStream(t *testing.T, s <-chan ma.Multiaddr, expected ma.Multia
}
}
}

func waitForDisconnectNotification(swarm *swarm.Swarm) <-chan struct{} {
done := make(chan struct{})
var once sync.Once
var nb *network.NotifyBundle
nb = &network.NotifyBundle{
DisconnectedF: func(n network.Network, c network.Conn) {
once.Do(func() {
go func() {
swarm.StopNotify(nb)
close(done)
}()
})
},
}
swarm.Notify(nb)

return done
}

0 comments on commit e6379f5

Please sign in to comment.