From b1c2f908359aafd71e852a21ceb3b0bfa9d52ce3 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 1 Apr 2020 14:39:49 -0700 Subject: [PATCH 1/3] change backoffs to per-address --- dial_test.go | 19 ++++++++------- go.mod | 2 +- go.sum | 1 + swarm_dial.go | 65 ++++++++++++++++++++++++++++++++++----------------- 4 files changed, 56 insertions(+), 31 deletions(-) diff --git a/dial_test.go b/dial_test.go index 73bd02c6..49c21fd6 100644 --- a/dial_test.go +++ b/dial_test.go @@ -200,7 +200,7 @@ func TestDialWait(t *testing.T) { t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts) } - if !s1.Backoff().Backoff(s2p) { + if !s1.Backoff().Backoff(s2p, s2addr) { t.Error("s2 should now be on backoff") } } @@ -337,10 +337,10 @@ func TestDialBackoff(t *testing.T) { } // check backoff state - if s1.Backoff().Backoff(s2.LocalPeer()) { + if s1.Backoff().Backoff(s2.LocalPeer(), s2addrs[0]) { t.Error("s2 should not be on backoff") } - if !s1.Backoff().Backoff(s3p) { + if !s1.Backoff().Backoff(s3p, s3addr) { t.Error("s3 should be on backoff") } @@ -407,10 +407,10 @@ func TestDialBackoff(t *testing.T) { } // check backoff state (the same) - if s1.Backoff().Backoff(s2.LocalPeer()) { + if s1.Backoff().Backoff(s2.LocalPeer(), s2addrs[0]) { t.Error("s2 should not be on backoff") } - if !s1.Backoff().Backoff(s3p) { + if !s1.Backoff().Backoff(s3p, s3addr) { t.Error("s3 should be on backoff") } } @@ -451,7 +451,7 @@ func TestDialBackoffClears(t *testing.T) { t.Error("> 2*transport.DialTimeout * DialAttempts not being respected", duration, 2*transport.DialTimeout*DialAttempts) } - if !s1.Backoff().Backoff(s2.LocalPeer()) { + if !s1.Backoff().Backoff(s2.LocalPeer(), s2bad) { t.Error("s2 should now be on backoff") } else { t.Log("correctly added to backoff") @@ -464,8 +464,9 @@ func TestDialBackoffClears(t *testing.T) { } s1.Peerstore().AddAddrs(s2.LocalPeer(), ifaceAddrs1, peerstore.PermanentAddrTTL) - if _, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil { - t.Fatal("should have failed to dial backed off peer") + if c, err := s1.DialPeer(ctx, s2.LocalPeer()); err == nil { + c.Close() + t.Log("backoffs are per address, not peer") } time.Sleep(BackoffBase) @@ -477,7 +478,7 @@ func TestDialBackoffClears(t *testing.T) { t.Log("correctly connected") } - if s1.Backoff().Backoff(s2.LocalPeer()) { + if s1.Backoff().Backoff(s2.LocalPeer(), s2bad) { t.Error("s2 should no longer be on backoff") } else { t.Log("correctly cleared backoff") diff --git a/go.mod b/go.mod index 5b475841..f451e713 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ require ( github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-addr-util v0.0.1 github.com/libp2p/go-conn-security-multistream v0.1.0 - github.com/libp2p/go-libp2p-core v0.4.0 + github.com/libp2p/go-libp2p-core v0.5.0 github.com/libp2p/go-libp2p-loggables v0.1.0 github.com/libp2p/go-libp2p-peerstore v0.2.2 github.com/libp2p/go-libp2p-secio v0.2.1 diff --git a/go.sum b/go.sum index 96113d65..40e8331a 100644 --- a/go.sum +++ b/go.sum @@ -274,6 +274,7 @@ github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/swarm_dial.go b/swarm_dial.go index f2b21abf..40821644 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -101,7 +101,9 @@ type DialBackoff struct { lock sync.RWMutex } -type backoffPeer struct { +type backoffPeer map[ma.Multiaddr]*backoffAddr + +type backoffAddr struct { tries int until time.Time } @@ -113,14 +115,18 @@ func (db *DialBackoff) init() { } // Backoff returns whether the client should backoff from dialing -// peer p -func (db *DialBackoff) Backoff(p peer.ID) (backoff bool) { +// peer p at address addr +func (db *DialBackoff) Backoff(p peer.ID, addr ma.Multiaddr) (backoff bool) { db.lock.Lock() defer db.lock.Unlock() db.init() bp, found := db.entries[p] - if found && time.Now().Before(bp.until) { - return true + if found && bp != nil { + ap, found := (*bp)[addr] + // TODO: cleanup out of date entries. + if found && time.Now().Before(ap.until) { + return true + } } return false @@ -145,25 +151,36 @@ var BackoffMax = time.Minute * 5 // BackoffBase + BakoffCoef * PriorBackoffs^2 // // Where PriorBackoffs is the number of previous backoffs. -func (db *DialBackoff) AddBackoff(p peer.ID) { +func (db *DialBackoff) AddBackoff(p peer.ID, addr ma.Multiaddr) { db.lock.Lock() defer db.lock.Unlock() db.init() bp, ok := db.entries[p] if !ok { - db.entries[p] = &backoffPeer{ + bp := backoffPeer(make(map[ma.Multiaddr]*backoffAddr)) + db.entries[p] = &bp + bp[addr] = &backoffAddr{ + tries: 1, + until: time.Now().Add(BackoffBase), + } + return + } + // todo: cleanup out of date entries. + ba, ok := (*bp)[addr] + if !ok { + (*bp)[addr] = &backoffAddr{ tries: 1, until: time.Now().Add(BackoffBase), } return } - backoffTime := BackoffBase + BackoffCoef*time.Duration(bp.tries*bp.tries) + backoffTime := BackoffBase + BackoffCoef*time.Duration(ba.tries*ba.tries) if backoffTime > BackoffMax { backoffTime = BackoffMax } - bp.until = time.Now().Add(backoffTime) - bp.tries++ + ba.until = time.Now().Add(backoffTime) + ba.tries++ } // Clear removes a backoff record. Clients should call this after a @@ -210,12 +227,6 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { return conn, nil } - // if this peer has been backed off, lets get out of here - if s.backf.Backoff(p) { - log.Event(ctx, "swarmDialBackoff", p) - return nil, ErrDialBackoff - } - // apply the DialPeer timeout ctx, cancel := context.WithTimeout(ctx, network.GetDialPeerTimeout(ctx)) defer cancel() @@ -268,10 +279,6 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) { log.Debugf("ignoring dial error because we have a connection: %s", err) return conn, nil } - if err != context.Canceled { - log.Event(ctx, "swarmDialBackoffAdd", logdial) - s.backf.AddBackoff(p) // let others know to backoff - } // ok, we failed. return nil, err @@ -318,10 +325,18 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses} } goodAddrsChan := make(chan ma.Multiaddr, len(goodAddrs)) + nonBackoff := false for _, a := range goodAddrs { - goodAddrsChan <- a + // skip addresses in back-off + if !s.backf.Backoff(p, a) { + nonBackoff = true + goodAddrsChan <- a + } } close(goodAddrsChan) + if !nonBackoff { + return nil, ErrDialBackoff + } ///////// // try to get a connection to any addr @@ -402,6 +417,10 @@ dialLoop: active-- if resp.Err != nil { // Errors are normal, lots of dials will fail + if resp.Err != context.Canceled { + s.backf.AddBackoff(p, resp.Addr) + } + log.Infof("got error on dial: %s", resp.Err) err.recordErr(resp.Addr, resp.Err) } else if resp.Conn != nil { @@ -429,6 +448,10 @@ dialLoop: active-- if resp.Err != nil { // Errors are normal, lots of dials will fail + if resp.Err != context.Canceled { + s.backf.AddBackoff(p, resp.Addr) + } + log.Infof("got error on dial: %s", resp.Err) err.recordErr(resp.Addr, resp.Err) } else if resp.Conn != nil { From a68f7aebb865089ffaea18421c7af678c42f3ff7 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 1 Apr 2020 15:19:38 -0700 Subject: [PATCH 2/3] add background cleanup task --- swarm.go | 1 + swarm_dial.go | 57 +++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 45 insertions(+), 13 deletions(-) diff --git a/swarm.go b/swarm.go index fe4e1ee9..198d88a7 100644 --- a/swarm.go +++ b/swarm.go @@ -114,6 +114,7 @@ func NewSwarm(ctx context.Context, local peer.ID, peers peerstore.Peerstore, bwc s.limiter = newDialLimiter(s.dialAddr) s.proc = goprocessctx.WithContext(ctx) s.ctx = goprocessctx.OnClosingContext(s.proc) + s.backf.init(s.ctx) // Set teardown after setting the context/process so we don't start the // teardown process early. diff --git a/swarm_dial.go b/swarm_dial.go index 40821644..9c2886fc 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -97,7 +97,7 @@ const DefaultPerPeerRateLimit = 8 // * It's thread-safe. // * It's *not* safe to move this type after using. type DialBackoff struct { - entries map[peer.ID]*backoffPeer + entries map[peer.ID]backoffPeer lock sync.RWMutex } @@ -108,9 +108,23 @@ type backoffAddr struct { until time.Time } -func (db *DialBackoff) init() { +func (db *DialBackoff) init(ctx context.Context) { if db.entries == nil { - db.entries = make(map[peer.ID]*backoffPeer) + db.entries = make(map[peer.ID]backoffPeer) + } + go db.background(ctx) +} + +func (db *DialBackoff) background(ctx context.Context) { + ticker := time.NewTicker(BackoffMax) + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + db.cleanup() + } } } @@ -119,10 +133,9 @@ func (db *DialBackoff) init() { func (db *DialBackoff) Backoff(p peer.ID, addr ma.Multiaddr) (backoff bool) { db.lock.Lock() defer db.lock.Unlock() - db.init() bp, found := db.entries[p] if found && bp != nil { - ap, found := (*bp)[addr] + ap, found := bp[addr] // TODO: cleanup out of date entries. if found && time.Now().Before(ap.until) { return true @@ -154,21 +167,18 @@ var BackoffMax = time.Minute * 5 func (db *DialBackoff) AddBackoff(p peer.ID, addr ma.Multiaddr) { db.lock.Lock() defer db.lock.Unlock() - db.init() bp, ok := db.entries[p] if !ok { - bp := backoffPeer(make(map[ma.Multiaddr]*backoffAddr)) - db.entries[p] = &bp - bp[addr] = &backoffAddr{ + db.entries[p] = backoffPeer(make(map[ma.Multiaddr]*backoffAddr)) + db.entries[p][addr] = &backoffAddr{ tries: 1, until: time.Now().Add(BackoffBase), } return } - // todo: cleanup out of date entries. - ba, ok := (*bp)[addr] + ba, ok := bp[addr] if !ok { - (*bp)[addr] = &backoffAddr{ + bp[addr] = &backoffAddr{ tries: 1, until: time.Now().Add(BackoffBase), } @@ -188,10 +198,31 @@ func (db *DialBackoff) AddBackoff(p peer.ID, addr ma.Multiaddr) { func (db *DialBackoff) Clear(p peer.ID) { db.lock.Lock() defer db.lock.Unlock() - db.init() delete(db.entries, p) } +func (db *DialBackoff) cleanup() { + db.lock.Lock() + defer db.lock.Unlock() + now := time.Now() + deletePeers := []peer.ID{} + for p, e := range db.entries { + good := false + for _, backoff := range e { + if now.Before(backoff.until) { + good = true + break + } + } + if !good { + deletePeers = append(deletePeers, p) + } + } + for _, p := range deletePeers { + delete(db.entries, p) + } +} + // DialPeer connects to a peer. // // The idea is that the client of Swarm does not need to know what network From b49b1c48454f2beced486f5077975fbfc4d68eb4 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Thu, 2 Apr 2020 09:15:44 -0700 Subject: [PATCH 3/3] simplify data structure --- swarm_dial.go | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/swarm_dial.go b/swarm_dial.go index 9c2886fc..75c9ad26 100644 --- a/swarm_dial.go +++ b/swarm_dial.go @@ -97,12 +97,10 @@ const DefaultPerPeerRateLimit = 8 // * It's thread-safe. // * It's *not* safe to move this type after using. type DialBackoff struct { - entries map[peer.ID]backoffPeer + entries map[peer.ID]map[string]*backoffAddr lock sync.RWMutex } -type backoffPeer map[ma.Multiaddr]*backoffAddr - type backoffAddr struct { tries int until time.Time @@ -110,17 +108,17 @@ type backoffAddr struct { func (db *DialBackoff) init(ctx context.Context) { if db.entries == nil { - db.entries = make(map[peer.ID]backoffPeer) + db.entries = make(map[peer.ID]map[string]*backoffAddr) } go db.background(ctx) } func (db *DialBackoff) background(ctx context.Context) { ticker := time.NewTicker(BackoffMax) + defer ticker.Stop() for { select { case <-ctx.Done(): - ticker.Stop() return case <-ticker.C: db.cleanup() @@ -134,9 +132,8 @@ func (db *DialBackoff) Backoff(p peer.ID, addr ma.Multiaddr) (backoff bool) { db.lock.Lock() defer db.lock.Unlock() bp, found := db.entries[p] - if found && bp != nil { - ap, found := bp[addr] - // TODO: cleanup out of date entries. + if found { + ap, found := bp[string(addr.Bytes())] if found && time.Now().Before(ap.until) { return true } @@ -165,20 +162,21 @@ var BackoffMax = time.Minute * 5 // // Where PriorBackoffs is the number of previous backoffs. func (db *DialBackoff) AddBackoff(p peer.ID, addr ma.Multiaddr) { + saddr := string(addr.Bytes()) db.lock.Lock() defer db.lock.Unlock() bp, ok := db.entries[p] if !ok { - db.entries[p] = backoffPeer(make(map[ma.Multiaddr]*backoffAddr)) - db.entries[p][addr] = &backoffAddr{ + db.entries[p] = make(map[string]*backoffAddr) + db.entries[p][saddr] = &backoffAddr{ tries: 1, until: time.Now().Add(BackoffBase), } return } - ba, ok := bp[addr] + ba, ok := bp[saddr] if !ok { - bp[addr] = &backoffAddr{ + bp[saddr] = &backoffAddr{ tries: 1, until: time.Now().Add(BackoffBase), } @@ -205,7 +203,6 @@ func (db *DialBackoff) cleanup() { db.lock.Lock() defer db.lock.Unlock() now := time.Now() - deletePeers := []peer.ID{} for p, e := range db.entries { good := false for _, backoff := range e { @@ -215,12 +212,9 @@ func (db *DialBackoff) cleanup() { } } if !good { - deletePeers = append(deletePeers, p) + delete(db.entries, p) } } - for _, p := range deletePeers { - delete(db.entries, p) - } } // DialPeer connects to a peer.