Skip to content

Commit

Permalink
Merge pull request #26 from libp2p/fix/fdconsuming
Browse files Browse the repository at this point in the history
Fix dialLimiter.fdConsuming counting
  • Loading branch information
Stebalien authored Jul 31, 2017
2 parents 527805d + 07e3a6d commit 50e4c42
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 4 deletions.
19 changes: 15 additions & 4 deletions p2p/net/swarm/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ func newDialLimiter(df dialfunc) *dialLimiter {
return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit)
}

func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter {
func newDialLimiterWithParams(df dialfunc, fdLimit, perPeerLimit int) *dialLimiter {
return &dialLimiter{
fdLimit: fdl,
perPeerLimit: ppl,
fdLimit: fdLimit,
perPeerLimit: perPeerLimit,
waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
activePerPeer: make(map[peer.ID]int),
dialFunc: df,
Expand All @@ -68,6 +68,7 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {

if addrutil.IsFDCostlyTransport(dj.addr) {
dl.fdConsuming--

if len(dl.waitingOnFd) > 0 {
next := dl.waitingOnFd[0]
dl.waitingOnFd = dl.waitingOnFd[1:]
Expand All @@ -89,18 +90,28 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) {
waitlist := dl.waitingOnPeerLimit[dj.peer]
if !dj.success && len(waitlist) > 0 {
next := waitlist[0]

if len(waitlist) == 1 {
delete(dl.waitingOnPeerLimit, dj.peer)
} else {
dl.waitingOnPeerLimit[dj.peer] = waitlist[1:]
}
dl.activePerPeer[dj.peer]++ // just kidding, we still want this token

if addrutil.IsFDCostlyTransport(next.addr) {
if dl.fdConsuming >= dl.fdLimit {
dl.waitingOnFd = append(dl.waitingOnFd, next)
return
}

// take token
dl.fdConsuming++
}

// can kick this off right here, dials in this list already
// have the other tokens needed
go dl.executeDial(next)
}

}

// AddDialJob tries to take the needed tokens for starting the given dial job.
Expand Down
66 changes: 66 additions & 0 deletions p2p/net/swarm/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,69 @@ func TestStressLimiter(t *testing.T) {
}
}
}

func TestFDLimitUnderflow(t *testing.T) {
dials := 0

df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (iconn.Conn, error) {
dials++

timeout := make(chan bool, 1)
go func() {
time.Sleep(time.Second * 5)
timeout <- true
}()

select {
case <-ctx.Done():
case <-timeout:
}

return nil, fmt.Errorf("df timed out")
}

l := newDialLimiterWithParams(df, 20, 3)

var addrs []ma.Multiaddr
for i := 0; i <= 1000; i++ {
addrs = append(addrs, addrWithPort(t, i))
}

for i := 0; i < 1000; i++ {
go func(id peer.ID, i int) {
ctx, cancel := context.WithCancel(context.Background())

resp := make(chan dialResult)
l.AddDialJob(&dialJob{
addr: addrs[i],
ctx: ctx,
peer: id,
resp: resp,
})

//cancel first 60 after 1s, next 60 after 2s
if i > 60 {
time.Sleep(time.Second * 1)
}
if i < 120 {
time.Sleep(time.Second * 1)
cancel()
return
}
defer cancel()

for res := range resp {
if res.Err != nil {
return
}
t.Fatal("got dial res, shouldn't")
}
}(peer.ID(fmt.Sprintf("testpeer%d", i % 20)), i)
}

time.Sleep(time.Second * 3)

if l.fdConsuming < 0 {
t.Fatalf("l.fdConsuming < 0")
}
}

0 comments on commit 50e4c42

Please sign in to comment.