From 635a5c798eb3d2b3468fbd7004f93865b1527eae Mon Sep 17 00:00:00 2001 From: Will Scott Date: Mon, 13 Apr 2020 17:10:01 -0700 Subject: [PATCH 1/7] integration test for the dual dht --- test/integration/wan_lan_dht_test.go | 223 +++++++++++++++++++++++++++ 1 file changed, 223 insertions(+) create mode 100644 test/integration/wan_lan_dht_test.go diff --git a/test/integration/wan_lan_dht_test.go b/test/integration/wan_lan_dht_test.go new file mode 100644 index 00000000000..7950a1254e5 --- /dev/null +++ b/test/integration/wan_lan_dht_test.go @@ -0,0 +1,223 @@ +package integrationtest + +import ( + "context" + "encoding/binary" + "fmt" + "math" + "math/rand" + "net" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core/bootstrap" + mock "github.com/ipfs/go-ipfs/core/mock" + + corenet "github.com/libp2p/go-libp2p-core/network" + peer "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + testutil "github.com/libp2p/go-libp2p-testing/net" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + + ma "github.com/multiformats/go-multiaddr" +) + +func TestDHTConnectivityFast(t *testing.T) { + conf := testutil.LatencyConfig{ + NetworkLatency: 0, + RoutingLatency: 0, + BlockstoreLatency: 0, + } + if err := RunDHTConnectivity(conf, 5); err != nil { + t.Fatal(err) + } +} + +func TestDHTConnectivitySlowNetwork(t *testing.T) { + SkipUnlessEpic(t) + conf := testutil.LatencyConfig{NetworkLatency: 400 * time.Millisecond} + if err := RunDHTConnectivity(conf, 5); err != nil { + t.Fatal(err) + } +} + +func TestDHTConnectivitySlowRouting(t *testing.T) { + SkipUnlessEpic(t) + conf := testutil.LatencyConfig{RoutingLatency: 400 * time.Millisecond} + if err := RunDHTConnectivity(conf, 5); err != nil { + t.Fatal(err) + } +} + +var wanPrefix = net.ParseIP("100::") +var lanPrefix = net.ParseIP("fe80::") + +func makeAddr(n uint32, wan bool) ma.Multiaddr { + var ip net.IP + if wan { + ip = append(net.IP{}, wanPrefix...) + } else { + ip = append(net.IP{}, lanPrefix...) + } + + binary.LittleEndian.PutUint32(ip[4:], n) + addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip6/%s/tcp/4242", ip)) + return addr +} + +func RunDHTConnectivity(conf testutil.LatencyConfig, numPeers int) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // create network + mn := mocknet.New(ctx) + mn.SetLinkDefaults(mocknet.LinkOptions{ + Latency: conf.NetworkLatency, + Bandwidth: math.MaxInt32, + }) + + testPeer, err := core.NewNode(ctx, &core.BuildCfg{ + Online: true, + Host: mock.MockHostOption(mn), + }) + if err != nil { + return err + } + defer testPeer.Close() + + wanPeers := []*core.IpfsNode{} + lanPeers := []*core.IpfsNode{} + + for i := 0; i < numPeers; i++ { + wanPeer, err := core.NewNode(ctx, &core.BuildCfg{ + Online: true, + Host: mock.MockHostOption(mn), + }) + if err != nil { + return err + } + defer wanPeer.Close() + wanAddr := makeAddr(uint32(i), true) + wanPeer.Peerstore.AddAddr(wanPeer.Identity, wanAddr, peerstore.PermanentAddrTTL) + for _, p := range wanPeers { + mn.ConnectPeers(p.Identity, wanPeer.Identity) + } + wanPeers = append(wanPeers, wanPeer) + + lanPeer, err := core.NewNode(ctx, &core.BuildCfg{ + Online: true, + Host: mock.MockHostOption(mn), + }) + if err != nil { + return err + } + defer lanPeer.Close() + lanAddr := makeAddr(uint32(i), false) + lanPeer.Peerstore.AddAddr(lanPeer.Identity, lanAddr, peerstore.PermanentAddrTTL) + for _, p := range lanPeers { + mn.ConnectPeers(p.Identity, lanPeer.Identity) + } + lanPeers = append(lanPeers, lanPeer) + } + + // The test peer is connected to one lan peer. + _, err = mn.ConnectPeers(testPeer.Identity, lanPeers[0].Identity) + if err != nil { + return err + } + + err, done := <-testPeer.DHT.RefreshRoutingTable() + if err != nil || !done { + if !done { + err = fmt.Errorf("expected refresh routing table to close") + } + return err + } + + // choose a lan peer and validate lan DHT is functioning. + i := rand.Intn(len(lanPeers)) + if testPeer.PeerHost.Network().Connectedness(lanPeers[i].Identity) == corenet.Connected { + testPeer.PeerHost.Network().ClosePeer(lanPeers[i].Identity) + testPeer.PeerHost.Peerstore().ClearAddrs(lanPeers[i].Identity) + } + // That peer will provide a new CID, and we'll validate the test node can find it. + provideCid := cid.NewCidV1(cid.Raw, []byte("Lan Provide Record")) + provideCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + if err := lanPeers[i].DHT.Provide(provideCtx, provideCid, true); err != nil { + return err + } + provs, err := testPeer.DHT.FindProviders(provideCtx, provideCid) + if err != nil { + return err + } + if len(provs) != 1 { + return fmt.Errorf("Expected one provider, got %d", len(provs)) + } + if provs[0].ID != lanPeers[i].Identity { + return fmt.Errorf("Unexpected lan peer provided record") + } + + // Now, bootstrap from a wan peer. + bis := wanPeers[0].Peerstore.PeerInfo(wanPeers[0].PeerHost.ID()) + bcfg := bootstrap.BootstrapConfigWithPeers([]peer.AddrInfo{bis}) + if err := testPeer.Bootstrap(bcfg); err != nil { + return err + } + + err, done = <-testPeer.DHT.RefreshRoutingTable() + if err != nil || !done { + if !done { + err = fmt.Errorf("expected refresh routing table to close") + } + return err + } + + // choose a wan peer and validate wan DHT is functioning. + i = rand.Intn(len(wanPeers)) + if testPeer.PeerHost.Network().Connectedness(wanPeers[i].Identity) == corenet.Connected { + testPeer.PeerHost.Network().ClosePeer(wanPeers[i].Identity) + testPeer.PeerHost.Peerstore().ClearAddrs(wanPeers[i].Identity) + } + // That peer will provide a new CID, and we'll validate the test node can find it. + wanCid := cid.NewCidV1(cid.Raw, []byte("Wan Provide Record")) + wanProvideCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + if err := wanPeers[i].DHT.Provide(wanProvideCtx, wanCid, true); err != nil { + return err + } + provs, err = testPeer.DHT.FindProviders(wanProvideCtx, wanCid) + if err != nil { + return err + } + if len(provs) != 1 { + return fmt.Errorf("Expected one provider, got %d", len(provs)) + } + if provs[0].ID != wanPeers[i].Identity { + return fmt.Errorf("Unexpected lan peer provided record") + } + + // Finally, re-share the lan provided cid from a wan peer and expect a merged result. + i = rand.Intn(len(wanPeers)) + if testPeer.PeerHost.Network().Connectedness(wanPeers[i].Identity) == corenet.Connected { + testPeer.PeerHost.Network().ClosePeer(wanPeers[i].Identity) + testPeer.PeerHost.Peerstore().ClearAddrs(wanPeers[i].Identity) + } + + provideCtx, cancel = context.WithTimeout(ctx, time.Second) + defer cancel() + if err := wanPeers[i].DHT.Provide(provideCtx, provideCid, true); err != nil { + return err + } + provs, err = testPeer.DHT.FindProviders(provideCtx, provideCid) + if err != nil { + return err + } + if len(provs) != 2 { + return fmt.Errorf("Expected two providers, got %d", len(provs)) + } + + return nil +} From 02cf54dec679cf723fb6a48ef32e9f1b70c23296 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 14 Apr 2020 08:15:19 -0700 Subject: [PATCH 2/7] rebase on dual DHT --- test/integration/wan_lan_dht_test.go | 58 ++++++++++++++++++---------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/test/integration/wan_lan_dht_test.go b/test/integration/wan_lan_dht_test.go index 7950a1254e5..e66c78d0f75 100644 --- a/test/integration/wan_lan_dht_test.go +++ b/test/integration/wan_lan_dht_test.go @@ -102,6 +102,7 @@ func RunDHTConnectivity(conf testutil.LatencyConfig, numPeers int) error { wanAddr := makeAddr(uint32(i), true) wanPeer.Peerstore.AddAddr(wanPeer.Identity, wanAddr, peerstore.PermanentAddrTTL) for _, p := range wanPeers { + mn.LinkPeers(p.Identity, wanPeer.Identity) mn.ConnectPeers(p.Identity, wanPeer.Identity) } wanPeers = append(wanPeers, wanPeer) @@ -117,18 +118,30 @@ func RunDHTConnectivity(conf testutil.LatencyConfig, numPeers int) error { lanAddr := makeAddr(uint32(i), false) lanPeer.Peerstore.AddAddr(lanPeer.Identity, lanAddr, peerstore.PermanentAddrTTL) for _, p := range lanPeers { + mn.LinkPeers(p.Identity, lanPeer.Identity) mn.ConnectPeers(p.Identity, lanPeer.Identity) } lanPeers = append(lanPeers, lanPeer) } + // Add interfaces / addresses to test peer. + wanAddr := makeAddr(0, true) + testPeer.Peerstore.AddAddr(testPeer.Identity, wanAddr, peerstore.PermanentAddrTTL) + lanAddr := makeAddr(0, false) + testPeer.Peerstore.AddAddr(testPeer.Identity, lanAddr, peerstore.PermanentAddrTTL) + // The test peer is connected to one lan peer. + for _, p := range lanPeers { + if _, err := mn.LinkPeers(testPeer.Identity, p.Identity); err != nil { + return err + } + } _, err = mn.ConnectPeers(testPeer.Identity, lanPeers[0].Identity) if err != nil { return err } - err, done := <-testPeer.DHT.RefreshRoutingTable() + err, done := <-testPeer.DHT.LAN.RefreshRoutingTable() if err != nil || !done { if !done { err = fmt.Errorf("expected refresh routing table to close") @@ -149,25 +162,28 @@ func RunDHTConnectivity(conf testutil.LatencyConfig, numPeers int) error { if err := lanPeers[i].DHT.Provide(provideCtx, provideCid, true); err != nil { return err } - provs, err := testPeer.DHT.FindProviders(provideCtx, provideCid) - if err != nil { - return err + provChan := testPeer.DHT.FindProvidersAsync(provideCtx, provideCid, 0) + prov, ok := <-provChan + if !ok || prov.ID == "" { + return fmt.Errorf("Expected provider. stream closed early") } - if len(provs) != 1 { - return fmt.Errorf("Expected one provider, got %d", len(provs)) - } - if provs[0].ID != lanPeers[i].Identity { + if prov.ID != lanPeers[i].Identity { return fmt.Errorf("Unexpected lan peer provided record") } // Now, bootstrap from a wan peer. + for _, p := range wanPeers { + if _, err := mn.LinkPeers(testPeer.Identity, p.Identity); err != nil { + return err + } + } bis := wanPeers[0].Peerstore.PeerInfo(wanPeers[0].PeerHost.ID()) bcfg := bootstrap.BootstrapConfigWithPeers([]peer.AddrInfo{bis}) if err := testPeer.Bootstrap(bcfg); err != nil { return err } - err, done = <-testPeer.DHT.RefreshRoutingTable() + err, done = <-testPeer.DHT.WAN.RefreshRoutingTable() if err != nil || !done { if !done { err = fmt.Errorf("expected refresh routing table to close") @@ -188,14 +204,12 @@ func RunDHTConnectivity(conf testutil.LatencyConfig, numPeers int) error { if err := wanPeers[i].DHT.Provide(wanProvideCtx, wanCid, true); err != nil { return err } - provs, err = testPeer.DHT.FindProviders(wanProvideCtx, wanCid) - if err != nil { - return err + provChan = testPeer.DHT.FindProvidersAsync(wanProvideCtx, wanCid, 0) + prov, ok = <-provChan + if !ok || prov.ID == "" { + return fmt.Errorf("Expected one provider, closed early") } - if len(provs) != 1 { - return fmt.Errorf("Expected one provider, got %d", len(provs)) - } - if provs[0].ID != wanPeers[i].Identity { + if prov.ID != wanPeers[i].Identity { return fmt.Errorf("Unexpected lan peer provided record") } @@ -211,12 +225,14 @@ func RunDHTConnectivity(conf testutil.LatencyConfig, numPeers int) error { if err := wanPeers[i].DHT.Provide(provideCtx, provideCid, true); err != nil { return err } - provs, err = testPeer.DHT.FindProviders(provideCtx, provideCid) - if err != nil { - return err + provChan = testPeer.DHT.FindProvidersAsync(provideCtx, provideCid, 0) + prov, ok = <-provChan + if !ok { + return fmt.Errorf("Expected two providers, got 0") } - if len(provs) != 2 { - return fmt.Errorf("Expected two providers, got %d", len(provs)) + prov, ok = <-provChan + if !ok { + return fmt.Errorf("Expected two providers, got 1") } return nil From 94de6055702618c9b1b5f790965e18d72fd75434 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 14 Apr 2020 12:26:02 -0700 Subject: [PATCH 3/7] startup bootstrap --- test/integration/wan_lan_dht_test.go | 51 ++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/test/integration/wan_lan_dht_test.go b/test/integration/wan_lan_dht_test.go index e66c78d0f75..7d32aaffd35 100644 --- a/test/integration/wan_lan_dht_test.go +++ b/test/integration/wan_lan_dht_test.go @@ -7,6 +7,7 @@ import ( "math" "math/rand" "net" + "os" "testing" "time" @@ -18,6 +19,7 @@ import ( corenet "github.com/libp2p/go-libp2p-core/network" peer "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" + kbucket "github.com/libp2p/go-libp2p-kbucket" testutil "github.com/libp2p/go-libp2p-testing/net" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -90,6 +92,8 @@ func RunDHTConnectivity(conf testutil.LatencyConfig, numPeers int) error { wanPeers := []*core.IpfsNode{} lanPeers := []*core.IpfsNode{} + connectionContext, connCtxCancel := context.WithTimeout(ctx, 15*time.Second) + defer connCtxCancel() for i := 0; i < numPeers; i++ { wanPeer, err := core.NewNode(ctx, &core.BuildCfg{ Online: true, @@ -103,7 +107,7 @@ func RunDHTConnectivity(conf testutil.LatencyConfig, numPeers int) error { wanPeer.Peerstore.AddAddr(wanPeer.Identity, wanAddr, peerstore.PermanentAddrTTL) for _, p := range wanPeers { mn.LinkPeers(p.Identity, wanPeer.Identity) - mn.ConnectPeers(p.Identity, wanPeer.Identity) + wanPeer.PeerHost.Connect(connectionContext, p.Peerstore.PeerInfo(p.Identity)) } wanPeers = append(wanPeers, wanPeer) @@ -119,10 +123,11 @@ func RunDHTConnectivity(conf testutil.LatencyConfig, numPeers int) error { lanPeer.Peerstore.AddAddr(lanPeer.Identity, lanAddr, peerstore.PermanentAddrTTL) for _, p := range lanPeers { mn.LinkPeers(p.Identity, lanPeer.Identity) - mn.ConnectPeers(p.Identity, lanPeer.Identity) + lanPeer.PeerHost.Connect(connectionContext, p.Peerstore.PeerInfo(p.Identity)) } lanPeers = append(lanPeers, lanPeer) } + connCtxCancel() // Add interfaces / addresses to test peer. wanAddr := makeAddr(0, true) @@ -136,24 +141,48 @@ func RunDHTConnectivity(conf testutil.LatencyConfig, numPeers int) error { return err } } - _, err = mn.ConnectPeers(testPeer.Identity, lanPeers[0].Identity) + err = testPeer.PeerHost.Connect(ctx, lanPeers[0].Peerstore.PeerInfo(lanPeers[0].Identity)) if err != nil { return err } - err, done := <-testPeer.DHT.LAN.RefreshRoutingTable() - if err != nil || !done { - if !done { - err = fmt.Errorf("expected refresh routing table to close") + startupCtx, startupCancel := context.WithTimeout(ctx, time.Second*15) + testPeer.DHT.Bootstrap(startupCtx) +StartupWait: + for { + select { + case err, done := <-testPeer.DHT.LAN.RefreshRoutingTable(): + if err.Error() == kbucket.ErrLookupFailure.Error() || + testPeer.DHT.LAN.RoutingTable() == nil || + testPeer.DHT.LAN.RoutingTable().Size() == 0 { + time.Sleep(100 * time.Millisecond) + continue + } + if err != nil || !done { + if !done { + err = fmt.Errorf("expected refresh routing table to close") + } + fmt.Fprintf(os.Stderr, "how odd. that was lookupfailure.\n") + startupCancel() + return err + } + break StartupWait + case <-startupCtx.Done(): + startupCancel() + return fmt.Errorf("expected faster dht bootstrap") } - return err } + startupCancel() + fmt.Fprintf(os.Stderr, "finding provider\n") // choose a lan peer and validate lan DHT is functioning. i := rand.Intn(len(lanPeers)) if testPeer.PeerHost.Network().Connectedness(lanPeers[i].Identity) == corenet.Connected { - testPeer.PeerHost.Network().ClosePeer(lanPeers[i].Identity) - testPeer.PeerHost.Peerstore().ClearAddrs(lanPeers[i].Identity) + i = (i + 1) % len(lanPeers) + if testPeer.PeerHost.Network().Connectedness(lanPeers[i].Identity) == corenet.Connected { + testPeer.PeerHost.Network().ClosePeer(lanPeers[i].Identity) + testPeer.PeerHost.Peerstore().ClearAddrs(lanPeers[i].Identity) + } } // That peer will provide a new CID, and we'll validate the test node can find it. provideCid := cid.NewCidV1(cid.Raw, []byte("Lan Provide Record")) @@ -183,7 +212,7 @@ func RunDHTConnectivity(conf testutil.LatencyConfig, numPeers int) error { return err } - err, done = <-testPeer.DHT.WAN.RefreshRoutingTable() + err, done := <-testPeer.DHT.WAN.RefreshRoutingTable() if err != nil || !done { if !done { err = fmt.Errorf("expected refresh routing table to close") From b61d9639f6cc5b1f0c316929694243a82d6095ab Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 14 Apr 2020 13:03:33 -0700 Subject: [PATCH 4/7] better address generation --- test/integration/wan_lan_dht_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/wan_lan_dht_test.go b/test/integration/wan_lan_dht_test.go index 7d32aaffd35..0bfcb081e50 100644 --- a/test/integration/wan_lan_dht_test.go +++ b/test/integration/wan_lan_dht_test.go @@ -64,7 +64,7 @@ func makeAddr(n uint32, wan bool) ma.Multiaddr { ip = append(net.IP{}, lanPrefix...) } - binary.LittleEndian.PutUint32(ip[4:], n) + binary.LittleEndian.PutUint32(ip[12:], n) addr, _ := ma.NewMultiaddr(fmt.Sprintf("/ip6/%s/tcp/4242", ip)) return addr } From 5079cfa6241ef7d314fea1c2ef5af8fb5a5cd659 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 14 Apr 2020 14:29:08 -0700 Subject: [PATCH 5/7] minor cleanup / progress --- test/integration/wan_lan_dht_test.go | 67 ++++++++++++++++------------ 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/test/integration/wan_lan_dht_test.go b/test/integration/wan_lan_dht_test.go index 0bfcb081e50..2ecee82b23b 100644 --- a/test/integration/wan_lan_dht_test.go +++ b/test/integration/wan_lan_dht_test.go @@ -13,13 +13,10 @@ import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-ipfs/core" - "github.com/ipfs/go-ipfs/core/bootstrap" mock "github.com/ipfs/go-ipfs/core/mock" corenet "github.com/libp2p/go-libp2p-core/network" - peer "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" - kbucket "github.com/libp2p/go-libp2p-kbucket" testutil "github.com/libp2p/go-libp2p-testing/net" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -146,26 +143,20 @@ func RunDHTConnectivity(conf testutil.LatencyConfig, numPeers int) error { return err } - startupCtx, startupCancel := context.WithTimeout(ctx, time.Second*15) - testPeer.DHT.Bootstrap(startupCtx) + startupCtx, startupCancel := context.WithTimeout(ctx, time.Second*60) StartupWait: for { select { - case err, done := <-testPeer.DHT.LAN.RefreshRoutingTable(): - if err.Error() == kbucket.ErrLookupFailure.Error() || - testPeer.DHT.LAN.RoutingTable() == nil || - testPeer.DHT.LAN.RoutingTable().Size() == 0 { + case err := <-testPeer.DHT.LAN.RefreshRoutingTable(): + if err != nil { + fmt.Printf("Error refreshing routing table: %v\n", err) + } + if testPeer.DHT.LAN.RoutingTable() == nil || + testPeer.DHT.LAN.RoutingTable().Size() == 0 || + err != nil { time.Sleep(100 * time.Millisecond) continue } - if err != nil || !done { - if !done { - err = fmt.Errorf("expected refresh routing table to close") - } - fmt.Fprintf(os.Stderr, "how odd. that was lookupfailure.\n") - startupCancel() - return err - } break StartupWait case <-startupCtx.Done(): startupCancel() @@ -174,7 +165,6 @@ StartupWait: } startupCancel() - fmt.Fprintf(os.Stderr, "finding provider\n") // choose a lan peer and validate lan DHT is functioning. i := rand.Intn(len(lanPeers)) if testPeer.PeerHost.Network().Connectedness(lanPeers[i].Identity) == corenet.Connected { @@ -200,32 +190,51 @@ StartupWait: return fmt.Errorf("Unexpected lan peer provided record") } - // Now, bootstrap from a wan peer. + fmt.Fprintf(os.Stderr, "moving on to WAN.\n") + // Now, connect with a wan peer. for _, p := range wanPeers { if _, err := mn.LinkPeers(testPeer.Identity, p.Identity); err != nil { return err } } - bis := wanPeers[0].Peerstore.PeerInfo(wanPeers[0].PeerHost.ID()) - bcfg := bootstrap.BootstrapConfigWithPeers([]peer.AddrInfo{bis}) - if err := testPeer.Bootstrap(bcfg); err != nil { + + err = testPeer.PeerHost.Connect(ctx, wanPeers[0].Peerstore.PeerInfo(wanPeers[0].Identity)) + if err != nil { return err } - err, done := <-testPeer.DHT.WAN.RefreshRoutingTable() - if err != nil || !done { - if !done { - err = fmt.Errorf("expected refresh routing table to close") + startupCtx, startupCancel = context.WithTimeout(ctx, time.Second*60*5) +WanStartupWait: + for { + select { + case err := <-testPeer.DHT.WAN.RefreshRoutingTable(): + //if err != nil { + // fmt.Printf("Error refreshing routing table: %v\n", err) + //} + if testPeer.DHT.WAN.RoutingTable() == nil || + testPeer.DHT.WAN.RoutingTable().Size() == 0 || + err != nil { + time.Sleep(100 * time.Millisecond) + continue + } + break WanStartupWait + case <-startupCtx.Done(): + startupCancel() + return fmt.Errorf("expected faster wan dht bootstrap") } - return err } + startupCancel() // choose a wan peer and validate wan DHT is functioning. i = rand.Intn(len(wanPeers)) if testPeer.PeerHost.Network().Connectedness(wanPeers[i].Identity) == corenet.Connected { - testPeer.PeerHost.Network().ClosePeer(wanPeers[i].Identity) - testPeer.PeerHost.Peerstore().ClearAddrs(wanPeers[i].Identity) + i = (i + 1) % len(wanPeers) + if testPeer.PeerHost.Network().Connectedness(wanPeers[i].Identity) == corenet.Connected { + testPeer.PeerHost.Network().ClosePeer(wanPeers[i].Identity) + testPeer.PeerHost.Peerstore().ClearAddrs(wanPeers[i].Identity) + } } + // That peer will provide a new CID, and we'll validate the test node can find it. wanCid := cid.NewCidV1(cid.Raw, []byte("Wan Provide Record")) wanProvideCtx, cancel := context.WithTimeout(ctx, time.Second) From 971cbf747ae3d85d2e9ec9f291e6907807c49822 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 14 Apr 2020 16:01:25 -0700 Subject: [PATCH 6/7] test passes --- test/integration/wan_lan_dht_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/integration/wan_lan_dht_test.go b/test/integration/wan_lan_dht_test.go index 2ecee82b23b..17714e6c579 100644 --- a/test/integration/wan_lan_dht_test.go +++ b/test/integration/wan_lan_dht_test.go @@ -7,13 +7,13 @@ import ( "math" "math/rand" "net" - "os" "testing" "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-ipfs/core" mock "github.com/ipfs/go-ipfs/core/mock" + libp2p2 "github.com/ipfs/go-ipfs/core/node/libp2p" corenet "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peerstore" @@ -93,8 +93,9 @@ func RunDHTConnectivity(conf testutil.LatencyConfig, numPeers int) error { defer connCtxCancel() for i := 0; i < numPeers; i++ { wanPeer, err := core.NewNode(ctx, &core.BuildCfg{ - Online: true, - Host: mock.MockHostOption(mn), + Online: true, + Routing: libp2p2.DHTServerOption, + Host: mock.MockHostOption(mn), }) if err != nil { return err @@ -190,7 +191,6 @@ StartupWait: return fmt.Errorf("Unexpected lan peer provided record") } - fmt.Fprintf(os.Stderr, "moving on to WAN.\n") // Now, connect with a wan peer. for _, p := range wanPeers { if _, err := mn.LinkPeers(testPeer.Identity, p.Identity); err != nil { @@ -203,7 +203,7 @@ StartupWait: return err } - startupCtx, startupCancel = context.WithTimeout(ctx, time.Second*60*5) + startupCtx, startupCancel = context.WithTimeout(ctx, time.Second*60) WanStartupWait: for { select { From e5a41875bae7215c902435f35af50d0984c1add9 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 14 Apr 2020 16:05:29 -0700 Subject: [PATCH 7/7] lint --- test/integration/wan_lan_dht_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/test/integration/wan_lan_dht_test.go b/test/integration/wan_lan_dht_test.go index 17714e6c579..77c6f129bc0 100644 --- a/test/integration/wan_lan_dht_test.go +++ b/test/integration/wan_lan_dht_test.go @@ -104,8 +104,8 @@ func RunDHTConnectivity(conf testutil.LatencyConfig, numPeers int) error { wanAddr := makeAddr(uint32(i), true) wanPeer.Peerstore.AddAddr(wanPeer.Identity, wanAddr, peerstore.PermanentAddrTTL) for _, p := range wanPeers { - mn.LinkPeers(p.Identity, wanPeer.Identity) - wanPeer.PeerHost.Connect(connectionContext, p.Peerstore.PeerInfo(p.Identity)) + _, _ = mn.LinkPeers(p.Identity, wanPeer.Identity) + _ = wanPeer.PeerHost.Connect(connectionContext, p.Peerstore.PeerInfo(p.Identity)) } wanPeers = append(wanPeers, wanPeer) @@ -120,8 +120,8 @@ func RunDHTConnectivity(conf testutil.LatencyConfig, numPeers int) error { lanAddr := makeAddr(uint32(i), false) lanPeer.Peerstore.AddAddr(lanPeer.Identity, lanAddr, peerstore.PermanentAddrTTL) for _, p := range lanPeers { - mn.LinkPeers(p.Identity, lanPeer.Identity) - lanPeer.PeerHost.Connect(connectionContext, p.Peerstore.PeerInfo(p.Identity)) + _, _ = mn.LinkPeers(p.Identity, lanPeer.Identity) + _ = lanPeer.PeerHost.Connect(connectionContext, p.Peerstore.PeerInfo(p.Identity)) } lanPeers = append(lanPeers, lanPeer) } @@ -171,7 +171,7 @@ StartupWait: if testPeer.PeerHost.Network().Connectedness(lanPeers[i].Identity) == corenet.Connected { i = (i + 1) % len(lanPeers) if testPeer.PeerHost.Network().Connectedness(lanPeers[i].Identity) == corenet.Connected { - testPeer.PeerHost.Network().ClosePeer(lanPeers[i].Identity) + _ = testPeer.PeerHost.Network().ClosePeer(lanPeers[i].Identity) testPeer.PeerHost.Peerstore().ClearAddrs(lanPeers[i].Identity) } } @@ -230,7 +230,7 @@ WanStartupWait: if testPeer.PeerHost.Network().Connectedness(wanPeers[i].Identity) == corenet.Connected { i = (i + 1) % len(wanPeers) if testPeer.PeerHost.Network().Connectedness(wanPeers[i].Identity) == corenet.Connected { - testPeer.PeerHost.Network().ClosePeer(wanPeers[i].Identity) + _ = testPeer.PeerHost.Network().ClosePeer(wanPeers[i].Identity) testPeer.PeerHost.Peerstore().ClearAddrs(wanPeers[i].Identity) } } @@ -254,7 +254,7 @@ WanStartupWait: // Finally, re-share the lan provided cid from a wan peer and expect a merged result. i = rand.Intn(len(wanPeers)) if testPeer.PeerHost.Network().Connectedness(wanPeers[i].Identity) == corenet.Connected { - testPeer.PeerHost.Network().ClosePeer(wanPeers[i].Identity) + _ = testPeer.PeerHost.Network().ClosePeer(wanPeers[i].Identity) testPeer.PeerHost.Peerstore().ClearAddrs(wanPeers[i].Identity) }