Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

identify: Fix flaky tests #1555

Merged
merged 7 commits into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/libp2p/go-libp2p-asn-util v0.2.0
github.com/libp2p/go-libp2p-circuit v0.6.0
github.com/libp2p/go-libp2p-core v0.16.1
github.com/libp2p/go-libp2p-peerstore v0.6.0
github.com/libp2p/go-libp2p-peerstore v0.7.0
github.com/libp2p/go-libp2p-resource-manager v0.3.0
github.com/libp2p/go-libp2p-testing v0.9.2
github.com/libp2p/go-mplex v0.7.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,9 @@ github.com/libp2p/go-libp2p-core v0.14.0/go.mod h1:tLasfcVdTXnixsLB0QYaT1syJOhsb
github.com/libp2p/go-libp2p-core v0.16.1 h1:bWoiEBqVkpJ13hbv/f69tHODp86t6mvc4fBN4DkK73M=
github.com/libp2p/go-libp2p-core v0.16.1/go.mod h1:O3i/7y+LqUb0N+qhzXjBjjpchgptWAVMG1Voegk7b4c=
github.com/libp2p/go-libp2p-mplex v0.5.0/go.mod h1:eLImPJLkj3iG5t5lq68w3Vm5NAQ5BcKwrrb2VmOYb3M=
github.com/libp2p/go-libp2p-peerstore v0.6.0 h1:HJminhQSGISBIRb93N6WK3t6Fa8OOTnHd/VBjL4mY5A=
github.com/libp2p/go-libp2p-peerstore v0.6.0/go.mod h1:DGEmKdXrcYpK9Jha3sS7MhqYdInxJy84bIPtSu65bKc=
github.com/libp2p/go-libp2p-peerstore v0.7.0 h1:2iIUwok3vtmnWJTZeTeLgnBO6GbkXcwSRwgZHEKrQZs=
github.com/libp2p/go-libp2p-peerstore v0.7.0/go.mod h1:cdUWTHro83vpg6unCpGUr8qJoX3e93Vy8o97u5ppIM0=
github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6na5f0/k=
github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA=
github.com/libp2p/go-libp2p-quic-transport v0.16.0 h1:aVg9/jr+R2esov5sH7wkXrmYmqJiUjtLMLYX3L9KYdY=
Expand Down
21 changes: 20 additions & 1 deletion p2p/net/swarm/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,29 @@ type config struct {
connectionGater connmgr.ConnectionGater
rcmgr network.ResourceManager
sk crypto.PrivKey
clock
}

type clock interface {
Now() time.Time
}

type realclock struct{}

func (rc realclock) Now() time.Time {
return time.Now()
}

// Option is an option that can be passed when constructing a test swarm.
type Option func(*testing.T, *config)

// WithClock sets the clock to use for this swarm
func WithClock(clock clock) Option {
return func(_ *testing.T, c *config) {
c.clock = clock
}
}

// OptDisableReuseport disables reuseport in this test swarm.
var OptDisableReuseport Option = func(_ *testing.T, c *config) {
c.disableReuseport = true
Expand Down Expand Up @@ -105,6 +123,7 @@ func GenUpgrader(t *testing.T, n *swarm.Swarm, opts ...tptu.Option) transport.Up
// GenSwarm generates a new test swarm.
func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm {
var cfg config
cfg.clock = realclock{}
for _, o := range opts {
o(t, &cfg)
}
Expand All @@ -124,7 +143,7 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm {
p.Addr = tnet.ZeroLocalTCPAddress
}

ps, err := pstoremem.NewPeerstore()
ps, err := pstoremem.NewPeerstore(pstoremem.WithClock(cfg.clock))
require.NoError(t, err)
ps.AddPubKey(p.ID, p.PubKey)
ps.AddPrivKey(p.ID, p.PrivKey)
Expand Down
1 change: 1 addition & 0 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ func (ids *idService) IdentifyWait(c network.Conn) <-chan struct{} {
go func() {
defer close(wait)
if err := ids.identifyConn(c); err != nil {
log.Warnf("failed to identify %s: %s", c.RemotePeer(), err)
ids.emitters.evtPeerIdentificationFailed.Emit(event.EvtPeerIdentificationFailed{Peer: c.RemotePeer(), Reason: err})
return
}
Expand Down
84 changes: 57 additions & 27 deletions p2p/protocol/identify/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p"
blhost "github.com/libp2p/go-libp2p/p2p/host/blank"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"
Expand All @@ -28,17 +29,22 @@ import (

"github.com/libp2p/go-eventbus"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
"github.com/libp2p/go-libp2p-testing/race"

mockClock "github.com/benbjohnson/clock"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-msgio/protoio"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func init() {
logging.SetLogLevel("net/identify", "debug")
}

func testKnowsAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiaddr) {
t.Helper()
assert.ElementsMatchf(t, expected, h.Peerstore().Addrs(p), fmt.Sprintf("%s did not have addr for %s", h.ID(), p))
require.True(t, assert.ElementsMatchf(t, expected, h.Peerstore().Addrs(p), fmt.Sprintf("%s did not have addr for %s", h.ID(), p)))
}

func testHasCertifiedAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.Multiaddr) {
Expand All @@ -62,7 +68,7 @@ func testHasCertifiedAddrs(t *testing.T, h host.Host, p peer.ID, expected []ma.M
if !ok {
t.Error("unexpected record type")
}
assert.ElementsMatchf(t, expected, rec.Addrs, fmt.Sprintf("%s did not have certified addr for %s", h.ID(), p))
require.True(t, assert.ElementsMatchf(t, expected, rec.Addrs, fmt.Sprintf("%s did not have certified addr for %s", h.ID(), p)))
}

func testHasProtocolVersions(t *testing.T, h host.Host, p peer.ID) {
Expand Down Expand Up @@ -147,15 +153,15 @@ func emitAddrChangeEvt(t *testing.T, h host.Host) {
// id service is done.
func TestIDService(t *testing.T) {
// This test is highly timing dependent, waiting on timeouts/expiration.
if race.WithRace() {
t.Skip("skipping highly timing dependent test when race detector is enabled")
}
oldTTL := peerstore.RecentlyConnectedAddrTTL
peerstore.RecentlyConnectedAddrTTL = 500 * time.Millisecond
t.Cleanup(func() { peerstore.RecentlyConnectedAddrTTL = oldTTL })

h1 := blhost.NewBlankHost(swarmt.GenSwarm(t))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t))
clk := mockClock.NewMock()
swarm1 := swarmt.GenSwarm(t, swarmt.WithClock(clk))
swarm2 := swarmt.GenSwarm(t, swarmt.WithClock(clk))
h1 := blhost.NewBlankHost(swarm1)
h2 := blhost.NewBlankHost(swarm2)

h1p := h1.ID()
h2p := h2.ID()
Expand Down Expand Up @@ -212,6 +218,8 @@ func TestIDService(t *testing.T) {
testHasPublicKey(t, h2, h1p, h1.Peerstore().PubKey(h1p)) // h1 should have h2's public key

// Need both sides to actually notice that the connection has been closed.
sentDisconnect1 := waitForDisconnectNotification(swarm1)
sentDisconnect2 := waitForDisconnectNotification(swarm2)
h1.Network().ClosePeer(h2p)
h2.Network().ClosePeer(h1p)
if len(h2.Network().ConnsToPeer(h1.ID())) != 0 || len(h1.Network().ConnsToPeer(h2.ID())) != 0 {
Expand All @@ -225,10 +233,13 @@ func TestIDService(t *testing.T) {
testHasCertifiedAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p))
testHasCertifiedAddrs(t, h2, h1p, h1.Peerstore().Addrs(h1p))

<-sentDisconnect1
<-sentDisconnect2

// the addrs had their TTLs reduced on disconnect, and
// will be forgotten soon after
t.Log("testing addrs after TTL expiration")
time.Sleep(time.Second)
clk.Add(time.Second)
testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{})
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{})
testHasCertifiedAddrs(t, h1, h2p, []ma.Multiaddr{})
Expand Down Expand Up @@ -794,13 +805,11 @@ func TestLargeIdentifyMessage(t *testing.T) {
peerstore.RecentlyConnectedAddrTTL = 500 * time.Millisecond
t.Cleanup(func() { peerstore.RecentlyConnectedAddrTTL = oldTTL })

sk1, _, err := coretest.RandTestKeyPair(ic.RSA, 4096)
require.NoError(t, err)
sk2, _, err := coretest.RandTestKeyPair(ic.RSA, 4096)
require.NoError(t, err)

h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptPeerPrivateKey(sk1)))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptPeerPrivateKey(sk2)))
clk := mockClock.NewMock()
swarm1 := swarmt.GenSwarm(t, swarmt.WithClock(clk))
swarm2 := swarmt.GenSwarm(t, swarmt.WithClock(clk))
h1 := blhost.NewBlankHost(swarm1)
h2 := blhost.NewBlankHost(swarm2)

// add protocol strings to make the message larger
// about 2K of protocol strings
Expand Down Expand Up @@ -833,10 +842,12 @@ func TestLargeIdentifyMessage(t *testing.T) {
forgetMe, _ := ma.NewMultiaddr("/ip4/1.2.3.4/tcp/1234")
h2.Peerstore().AddAddr(h1p, forgetMe, peerstore.RecentlyConnectedAddrTTL)

require.NoError(t, h1.Connect(context.Background(), h2.Peerstore().PeerInfo(h2p)))
h2pi := h2.Peerstore().PeerInfo(h2p)
h2pi.Addrs = h2pi.Addrs[:1]
require.NoError(t, h1.Connect(context.Background(), h2pi))

h1t2c := h1.Network().ConnsToPeer(h2p)
require.NotEmpty(t, h1t2c, "should have a conn here")
require.Equal(t, 1, len(h1t2c), "should have a conn here")

ids1.IdentifyConn(h1t2c[0])

Expand All @@ -851,7 +862,7 @@ func TestLargeIdentifyMessage(t *testing.T) {
// now, this wait we do have to do. it's the wait for the Listening side
// to be done identifying the connection.
c := h2.Network().ConnsToPeer(h1.ID())
if len(c) < 1 {
if len(c) != 1 {
t.Fatal("should have connection by now at least.")
}
ids2.IdentifyConn(c[0])
Expand All @@ -864,6 +875,8 @@ func TestLargeIdentifyMessage(t *testing.T) {
testHasPublicKey(t, h2, h1p, h1.Peerstore().PubKey(h1p)) // h1 should have h2's public key

// Need both sides to actually notice that the connection has been closed.
sentDisconnect1 := waitForDisconnectNotification(swarm1)
sentDisconnect2 := waitForDisconnectNotification(swarm2)
h1.Network().ClosePeer(h2p)
h2.Network().ClosePeer(h1p)
if len(h2.Network().ConnsToPeer(h1.ID())) != 0 || len(h1.Network().ConnsToPeer(h2.ID())) != 0 {
Expand All @@ -877,10 +890,13 @@ func TestLargeIdentifyMessage(t *testing.T) {
testHasCertifiedAddrs(t, h1, h2p, h2.Peerstore().Addrs(h2p))
testHasCertifiedAddrs(t, h2, h1p, h1.Peerstore().Addrs(h1p))

<-sentDisconnect1
<-sentDisconnect2

// the addrs had their TTLs reduced on disconnect, and
// will be forgotten soon after
t.Log("testing addrs after TTL expiration")
time.Sleep(time.Second)
clk.Add(time.Second)
testKnowsAddrs(t, h1, h2p, []ma.Multiaddr{})
testKnowsAddrs(t, h2, h1p, []ma.Multiaddr{})
testHasCertifiedAddrs(t, h1, h2p, []ma.Multiaddr{})
Expand All @@ -898,13 +914,8 @@ func TestLargePushMessage(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sk1, _, err := coretest.RandTestKeyPair(ic.RSA, 4096)
require.NoError(t, err)
sk2, _, err := coretest.RandTestKeyPair(ic.RSA, 4096)
require.NoError(t, err)

h1 := blhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptPeerPrivateKey(sk1)))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptPeerPrivateKey(sk2)))
h1 := blhost.NewBlankHost(swarmt.GenSwarm(t))
h2 := blhost.NewBlankHost(swarmt.GenSwarm(t))

// add protocol strings to make the message larger
// about 2K of protocol strings
Expand Down Expand Up @@ -1109,3 +1120,22 @@ func waitForAddrInStream(t *testing.T, s <-chan ma.Multiaddr, expected ma.Multia
}
}
}

func waitForDisconnectNotification(swarm *swarm.Swarm) <-chan struct{} {
done := make(chan struct{})
var once sync.Once
var nb *network.NotifyBundle
nb = &network.NotifyBundle{
DisconnectedF: func(n network.Network, c network.Conn) {
once.Do(func() {
go func() {
swarm.StopNotify(nb)
close(done)
}()
})
},
}
swarm.Notify(nb)

return done
}