Skip to content

Commit

Permalink
Merge pull request ipfs#277 from libp2p/fix/276
Browse files Browse the repository at this point in the history
fix(dialQueue): account for failed dials
  • Loading branch information
raulk authored Feb 27, 2019
2 parents f0215fc + c78d1e6 commit d724d73
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 1 deletion.
35 changes: 35 additions & 0 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,41 @@ func TestLayeredGet(t *testing.T) {
}
}

func TestUnfindablePeer(t *testing.T) {
if testing.Short() {
t.SkipNow()
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

maddrs, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Close()
dhts[i].host.Close()
}
}()

connect(t, ctx, dhts[0], dhts[1])
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[2], dhts[3])

// Give DHT 1 a bad addr for DHT 2.
dhts[1].host.Peerstore().ClearAddrs(peers[2])
dhts[1].host.Peerstore().AddAddr(peers[2], maddrs[0], time.Minute)

ctxT, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
_, err := dhts[0].FindPeer(ctxT, peers[3])
if err == nil {
t.Error("should have failed to find peer")
}
if ctxT.Err() != nil {
t.Error("FindPeer should have failed before context expired")
}
}

func TestFindPeer(t *testing.T) {
// t.Skip("skipping test to debug another")
if testing.Short() {
Expand Down
6 changes: 5 additions & 1 deletion dial_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,11 @@ func (dq *dialQueue) worker() {
return
case <-idleTimer.C:
// no new dial requests during our idle period; time to scale down.
case p := <-dq.in.DeqChan:
case p, ok := <-dq.in.DeqChan:
if !ok {
return
}

t := time.Now()
if err := dq.dialFn(dq.ctx, p); err != nil {
logger.Debugf("discarding dialled peer because of error: %v", err)
Expand Down
3 changes: 3 additions & 0 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ func (r *dhtQueryRunner) dialPeer(ctx context.Context, p peer.ID) error {
r.Lock()
r.errs = append(r.errs, err)
r.Unlock()

// This peer is dropping out of the race.
r.peersRemaining.Decrement(1)
return err
}
logger.Debugf("connected. dial success.")
Expand Down

0 comments on commit d724d73

Please sign in to comment.