Skip to content

Commit

Permalink
don't execute cancelled jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed May 6, 2016
1 parent d7121bf commit 74a15be
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 7 deletions.
13 changes: 13 additions & 0 deletions p2p/net/swarm/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ type dialJob struct {
success bool
}

func (dj *dialJob) cancelled() bool {
select {
case <-dj.ctx.Done():
return true
default:
return false
}
}

type dialLimiter struct {
rllock sync.Mutex
fdConsuming int
Expand Down Expand Up @@ -116,6 +125,10 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) {
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
defer dl.finishedDial(j)
if j.cancelled() {
return
}

con, err := dl.dialFunc(j.ctx, j.peer, j.addr)
select {
case j.resp <- dialResult{Conn: con, Err: err}:
Expand Down
64 changes: 57 additions & 7 deletions p2p/net/swarm/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package swarm

import (
"fmt"
"math/rand"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -75,13 +76,7 @@ func TestLimiterBasicDials(t *testing.T) {

l := newDialLimiterWithParams(hangDialFunc(hang), concurrentFdDials, 4)

bads := []ma.Multiaddr{
addrWithPort(t, 1),
addrWithPort(t, 2),
addrWithPort(t, 3),
addrWithPort(t, 4),
}

bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)}
good := addrWithPort(t, 20)

resch := make(chan dialResult)
Expand Down Expand Up @@ -162,6 +157,7 @@ func TestFDLimiting(t *testing.T) {
pid5 := peer.ID("testpeer5")
utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp")

// This should complete immediately since utp addresses arent blocked by fd rate limiting
l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch})

select {
Expand Down Expand Up @@ -263,3 +259,57 @@ func TestTokenRedistribution(t *testing.T) {
t.Fatal("should have gotten successful dial")
}
}

func TestStressLimiter(t *testing.T) {
df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) {
if tcpPortOver(a, 1000) {
return conn.Conn(nil), nil
} else {
time.Sleep(time.Millisecond * time.Duration(5+rand.Intn(100)))
return nil, fmt.Errorf("test bad dial")
}
}

l := newDialLimiterWithParams(df, 20, 5)

var bads []ma.Multiaddr
for i := 0; i < 100; i++ {
bads = append(bads, addrWithPort(t, i))
}

addresses := append(bads, addrWithPort(t, 2000))
success := make(chan struct{})

for i := 0; i < 20; i++ {
go func(id peer.ID) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

resp := make(chan dialResult)
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
for _, i := range rand.Perm(len(addresses)) {
l.AddDialJob(&dialJob{
addr: addresses[i],
ctx: ctx,
peer: id,
resp: resp,
})
}

for res := range resp {
if res.Err == nil {
success <- struct{}{}
return
}
}
}(peer.ID(fmt.Sprintf("testpeer%d", i)))
}

for i := 0; i < 20; i++ {
select {
case <-success:
case <-time.After(time.Second * 5):
t.Fatal("expected a success within five seconds")
}
}
}

0 comments on commit 74a15be

Please sign in to comment.