Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

identify: refactor sending of Identify pushes #1984

Merged
merged 9 commits into from
Feb 8, 2023
3 changes: 2 additions & 1 deletion p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {

h.updateLocalIpAddr()

if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}, eventbus.Stateful); err != nil {
return nil, err
}
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil {
Expand Down Expand Up @@ -367,6 +367,7 @@ func (h *BasicHost) updateLocalIpAddr() {
func (h *BasicHost) Start() {
h.psManager.Start()
h.refCount.Add(1)
h.ids.Start()
go h.background()
}

Expand Down
21 changes: 12 additions & 9 deletions p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestHostAddrsFactory(t *testing.T) {

addrs := h.Addrs()
if len(addrs) != 1 {
t.Fatalf("expected 1 addr, got %d", len(addrs))
t.Fatalf("expected 1 addr, got %+v", addrs)
}
if !addrs[0].Equal(maddr) {
t.Fatalf("expected %s, got %s", maddr.String(), addrs[0].String())
Expand Down Expand Up @@ -245,8 +245,10 @@ func getHostPair(t *testing.T) (host.Host, host.Host) {

h1, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
h1.Start()
h2, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
h2.Start()

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down Expand Up @@ -342,14 +344,12 @@ func TestHostProtoMismatch(t *testing.T) {
}

func TestHostProtoPreknowledge(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h1, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
defer h1.Close()

h2, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
defer h1.Close()
defer h2.Close()

conn := make(chan protocol.ID)
Expand All @@ -362,8 +362,11 @@ func TestHostProtoPreknowledge(t *testing.T) {
// Prevent pushing identify information so this test actually _uses_ the super protocol.
h1.RemoveStreamHandler(identify.IDPush)

h1.Start()
h2.Start()

h2pi := h2.Peerstore().PeerInfo(h2.ID())
require.NoError(t, h1.Connect(ctx, h2pi))
require.NoError(t, h1.Connect(context.Background(), h2pi))

// wait for identify handshake to finish completely
select {
Expand All @@ -380,12 +383,12 @@ func TestHostProtoPreknowledge(t *testing.T) {

h2.SetStreamHandler("/foo", handler)

s, err := h1.NewStream(ctx, h2.ID(), "/foo", "/bar", "/super")
s, err := h1.NewStream(context.Background(), h2.ID(), "/foo", "/bar", "/super")
require.NoError(t, err)

select {
case p := <-conn:
t.Fatal("shouldnt have gotten connection yet, we should have a lazy stream: ", p)
t.Fatal("shouldn't have gotten connection yet, we should have a lazy stream: ", p)
case <-time.After(time.Millisecond * 50):
}

Expand Down Expand Up @@ -532,7 +535,6 @@ func TestAddrChangeImmediatelyIfAddressNonEmpty(t *testing.T) {
return taddrs
}})
require.NoError(t, err)
h.Start()
defer h.Close()

sub, err := h.EventBus().Subscribe(&event.EvtLocalAddressesUpdated{})
Expand All @@ -541,6 +543,7 @@ func TestAddrChangeImmediatelyIfAddressNonEmpty(t *testing.T) {
t.Error(err)
}
defer sub.Close()
h.Start()

expected := event.EvtLocalAddressesUpdated{
Diffs: true,
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/mock/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func TestLimitedStreams(t *testing.T) {
}

wg.Wait()
if !within(time.Since(before), time.Second*2, time.Second) {
if !within(time.Since(before), time.Second*5/2, time.Second) {
t.Fatal("Expected 2ish seconds but got ", time.Since(before))
}
}
Expand Down
21 changes: 20 additions & 1 deletion p2p/protocol/holepunch/holepunch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p/p2p/transport/tcp"

"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-testing/race"
"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -68,6 +72,8 @@ var _ identify.IDService = &mockIDService{}
func newMockIDService(t *testing.T, h host.Host) identify.IDService {
ids, err := identify.NewIDService(h)
require.NoError(t, err)
ids.Start()
t.Cleanup(func() { ids.Close() })
return &mockIDService{IDService: ids}
}

Expand Down Expand Up @@ -448,10 +454,23 @@ func makeRelayedHosts(t *testing.T, h1opt, h2opt []holepunch.Option, addHolePunc
libp2p.ResourceManager(&network.NullResourceManager{}),
)
require.NoError(t, err)

_, err = relayv1.NewRelay(relay)
require.NoError(t, err)

// make sure the relay service is started and advertised by Identify
h, err := libp2p.New(
libp2p.NoListenAddrs,
libp2p.Transport(tcp.NewTCPTransport),
libp2p.DisableRelay(),
)
require.NoError(t, err)
defer h.Close()
require.NoError(t, h.Connect(context.Background(), peer.AddrInfo{ID: relay.ID(), Addrs: relay.Addrs()}))
require.Eventually(t, func() bool {
supported, err := h.Peerstore().SupportsProtocols(relay.ID(), proto.ProtoIDv2Hop, relayv1.ProtoID)
return err == nil && len(supported) > 0
}, 3*time.Second, 100*time.Millisecond)

h2 = mkHostWithStaticAutoRelay(t, relay)
if addHolePuncher {
hps = addHolePunchService(t, h2, h2opt...)
Expand Down
Loading