Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
Merge pull request #191 from libp2p/feat/addrbackoff
Browse files Browse the repository at this point in the history
change backoffs to per-address
  • Loading branch information
Stebalien authored Apr 2, 2020
2 parents b6831d4 + b49b1c4 commit 6711d98
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 37 deletions.
19 changes: 10 additions & 9 deletions dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestDialWait(t *testing.T) {
t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
}

if !s1.Backoff().Backoff(s2p) {
if !s1.Backoff().Backoff(s2p, s2addr) {
t.Error("s2 should now be on backoff")
}
}
Expand Down Expand Up @@ -337,10 +337,10 @@ func TestDialBackoff(t *testing.T) {
}

// check backoff state
if s1.Backoff().Backoff(s2.LocalPeer()) {
if s1.Backoff().Backoff(s2.LocalPeer(), s2addrs[0]) {
t.Error("s2 should not be on backoff")
}
if !s1.Backoff().Backoff(s3p) {
if !s1.Backoff().Backoff(s3p, s3addr) {
t.Error("s3 should be on backoff")
}

Expand Down Expand Up @@ -407,10 +407,10 @@ func TestDialBackoff(t *testing.T) {
}

// check backoff state (the same)
if s1.Backoff().Backoff(s2.LocalPeer()) {
if s1.Backoff().Backoff(s2.LocalPeer(), s2addrs[0]) {
t.Error("s2 should not be on backoff")
}
if !s1.Backoff().Backoff(s3p) {
if !s1.Backoff().Backoff(s3p, s3addr) {
t.Error("s3 should be on backoff")
}
}
Expand Down Expand Up @@ -451,7 +451,7 @@ func TestDialBackoffClears(t *testing.T) {
t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts)
}

if !s1.Backoff().Backoff(s2.LocalPeer()) {
if !s1.Backoff().Backoff(s2.LocalPeer(), s2bad) {
t.Error("s2 should now be on backoff")
} else {
t.Log("correctly added to backoff")
Expand All @@ -464,8 +464,9 @@ func TestDialBackoffClears(t *testing.T) {
}
s1.Peerstore().AddAddrs(s2.LocalPeer(), ifaceAddrs1, peerstore.PermanentAddrTTL)

if _, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil {
t.Fatal("should have failed to dial backed off peer")
if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil {
c.Close()
t.Log("backoffs are per address, not peer")
}

time.Sleep(BackoffBase)
Expand All @@ -477,7 +478,7 @@ func TestDialBackoffClears(t *testing.T) {
t.Log("correctly connected")
}

if s1.Backoff().Backoff(s2.LocalPeer()) {
if s1.Backoff().Backoff(s2.LocalPeer(), s2bad) {
t.Error("s2 should no longer be on backoff")
} else {
t.Log("correctly cleared backoff")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ require (
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-addr-util v0.0.1
github.com/libp2p/go-conn-security-multistream v0.1.0
github.com/libp2p/go-libp2p-core v0.4.0
github.com/libp2p/go-libp2p-core v0.5.0
github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-peerstore v0.2.2
github.com/libp2p/go-libp2p-secio v0.2.1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
1 change: 1 addition & 0 deletions swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc
s.limiter = newDialLimiter(s.dialAddr)
s.proc = goprocessctx.WithContext(ctx)
s.ctx = goprocessctx.OnClosingContext(s.proc)
s.backf.init(s.ctx)

// Set teardown after setting the context/process so we don't start the
// teardown process early.
Expand Down
102 changes: 75 additions & 27 deletions swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,30 +97,46 @@ const DefaultPerPeerRateLimit = 8
// * It's thread-safe.
// * It's *not* safe to move this type after using.
type DialBackoff struct {
entries map[peer.ID]*backoffPeer
entries map[peer.ID]map[string]*backoffAddr
lock sync.RWMutex
}

type backoffPeer struct {
type backoffAddr struct {
tries int
until time.Time
}

func (db *DialBackoff) init() {
func (db *DialBackoff) init(ctx context.Context) {
if db.entries == nil {
db.entries = make(map[peer.ID]*backoffPeer)
db.entries = make(map[peer.ID]map[string]*backoffAddr)
}
go db.background(ctx)
}

func (db *DialBackoff) background(ctx context.Context) {
ticker := time.NewTicker(BackoffMax)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
db.cleanup()
}
}
}

// Backoff returns whether the client should backoff from dialing
// peer p
func (db *DialBackoff) Backoff(p peer.ID) (backoff bool) {
// peer p at address addr
func (db *DialBackoff) Backoff(p peer.ID, addr ma.Multiaddr) (backoff bool) {
db.lock.Lock()
defer db.lock.Unlock()
db.init()
bp, found := db.entries[p]
if found && time.Now().Before(bp.until) {
return true
if found {
ap, found := bp[string(addr.Bytes())]
if found && time.Now().Before(ap.until) {
return true
}
}

return false
Expand All @@ -145,36 +161,62 @@ var BackoffMax = time.Minute * 5
// BackoffBase + BakoffCoef * PriorBackoffs^2
//
// Where PriorBackoffs is the number of previous backoffs.
func (db *DialBackoff) AddBackoff(p peer.ID) {
func (db *DialBackoff) AddBackoff(p peer.ID, addr ma.Multiaddr) {
saddr := string(addr.Bytes())
db.lock.Lock()
defer db.lock.Unlock()
db.init()
bp, ok := db.entries[p]
if !ok {
db.entries[p] = &backoffPeer{
db.entries[p] = make(map[string]*backoffAddr)
db.entries[p][saddr] = &backoffAddr{
tries: 1,
until: time.Now().Add(BackoffBase),
}
return
}
ba, ok := bp[saddr]
if !ok {
bp[saddr] = &backoffAddr{
tries: 1,
until: time.Now().Add(BackoffBase),
}
return
}

backoffTime := BackoffBase + BackoffCoef*time.Duration(bp.tries*bp.tries)
backoffTime := BackoffBase + BackoffCoef*time.Duration(ba.tries*ba.tries)
if backoffTime > BackoffMax {
backoffTime = BackoffMax
}
bp.until = time.Now().Add(backoffTime)
bp.tries++
ba.until = time.Now().Add(backoffTime)
ba.tries++
}

// Clear removes a backoff record. Clients should call this after a
// successful Dial.
func (db *DialBackoff) Clear(p peer.ID) {
db.lock.Lock()
defer db.lock.Unlock()
db.init()
delete(db.entries, p)
}

func (db *DialBackoff) cleanup() {
db.lock.Lock()
defer db.lock.Unlock()
now := time.Now()
for p, e := range db.entries {
good := false
for _, backoff := range e {
if now.Before(backoff.until) {
good = true
break
}
}
if !good {
delete(db.entries, p)
}
}
}

// DialPeer connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
Expand Down Expand Up @@ -210,12 +252,6 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
return conn, nil
}

// if this peer has been backed off, lets get out of here
if s.backf.Backoff(p) {
log.Event(ctx, "swarmDialBackoff", p)
return nil, ErrDialBackoff
}

// apply the DialPeer timeout
ctx, cancel := context.WithTimeout(ctx, network.GetDialPeerTimeout(ctx))
defer cancel()
Expand Down Expand Up @@ -268,10 +304,6 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
log.Debugf("ignoring dial error because we have a connection: %s", err)
return conn, nil
}
if err != context.Canceled {
log.Event(ctx, "swarmDialBackoffAdd", logdial)
s.backf.AddBackoff(p) // let others know to backoff
}

// ok, we failed.
return nil, err
Expand Down Expand Up @@ -318,10 +350,18 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses}
}
goodAddrsChan := make(chan ma.Multiaddr, len(goodAddrs))
nonBackoff := false
for _, a := range goodAddrs {
goodAddrsChan <- a
// skip addresses in back-off
if !s.backf.Backoff(p, a) {
nonBackoff = true
goodAddrsChan <- a
}
}
close(goodAddrsChan)
if !nonBackoff {
return nil, ErrDialBackoff
}
/////////

// try to get a connection to any addr
Expand Down Expand Up @@ -402,6 +442,10 @@ dialLoop:
active--
if resp.Err != nil {
// Errors are normal, lots of dials will fail
if resp.Err != context.Canceled {
s.backf.AddBackoff(p, resp.Addr)
}

log.Infof("got error on dial: %s", resp.Err)
err.recordErr(resp.Addr, resp.Err)
} else if resp.Conn != nil {
Expand Down Expand Up @@ -429,6 +473,10 @@ dialLoop:
active--
if resp.Err != nil {
// Errors are normal, lots of dials will fail
if resp.Err != context.Canceled {
s.backf.AddBackoff(p, resp.Addr)
}

log.Infof("got error on dial: %s", resp.Err)
err.recordErr(resp.Addr, resp.Err)
} else if resp.Conn != nil {
Expand Down

0 comments on commit 6711d98

Please sign in to comment.