Skip to content

Commit

Permalink
feat: kademlia blocklists peer on ping timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Sep 27, 2021
1 parent 8ff66db commit f90fc67
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 17 deletions.
23 changes: 16 additions & 7 deletions pkg/blocker/blocker_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package blocker_test

import (
Expand All @@ -16,7 +20,7 @@ func TestBlocksAfterFlagTimeout(t *testing.T) {

blocked := make(map[string]time.Duration)

mock := mockBlockLister(func(a swarm.Address, d time.Duration) error {
mock := mockBlockLister(func(a swarm.Address, d time.Duration, r string) error {
blocked[a.ByteString()] = d
return nil
})
Expand All @@ -36,7 +40,12 @@ func TestBlocksAfterFlagTimeout(t *testing.T) {
t.Fatal("blocker did not wait flag duration")
}

time.Sleep(checkTime)
midway := time.After(flagTime / 2)
check := time.After(checkTime)

<-midway
b.Flag(addr) // check thats this flag call does not overide previous call
<-check

blockedTime, ok := blocked[addr.ByteString()]
if !ok {
Expand All @@ -52,7 +61,7 @@ func TestUnflagBeforeBlock(t *testing.T) {

blocked := make(map[string]time.Duration)

mock := mockBlockLister(func(a swarm.Address, d time.Duration) error {
mock := mockBlockLister(func(a swarm.Address, d time.Duration, r string) error {
blocked[a.ByteString()] = d
return nil
})
Expand Down Expand Up @@ -83,15 +92,15 @@ func TestUnflagBeforeBlock(t *testing.T) {
}

type blocklister struct {
blocklistFunc func(swarm.Address, time.Duration) error
blocklistFunc func(swarm.Address, time.Duration, string) error
}

func mockBlockLister(f func(swarm.Address, time.Duration) error) *blocklister {
func mockBlockLister(f func(swarm.Address, time.Duration, string) error) *blocklister {
return &blocklister{
blocklistFunc: f,
}
}

func (b *blocklister) Blocklist(addr swarm.Address, t time.Duration) error {
return b.blocklistFunc(addr, t)
func (b *blocklister) Blocklist(addr swarm.Address, t time.Duration, r string) error {
return b.blocklistFunc(addr, t, r)
}
14 changes: 6 additions & 8 deletions pkg/blocker/blockler.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package blocker

import (
Expand All @@ -9,12 +13,6 @@ import (
"github.com/ethersphere/bee/pkg/swarm"
)

// var (
// flagTimeout = 5 * time.Minute // how long before blocking a flagged peer
// cleanupTimeout = 5 * time.Minute // how long before we cleanup a peer
// blockDuration = time.Hour // how long to blocklist an unresponsive peer for
// )

type peer struct {
flagged bool // indicates whether the peer is actively flagged
blockAfter time.Time // timestamp of the point we've timed-out or got an error from a peer
Expand Down Expand Up @@ -76,8 +74,8 @@ func (b *Blocker) block() {
defer b.mux.Unlock()

for key, peer := range b.peers {
if time.Now().After(peer.blockAfter) {
if err := b.disconnector.Blocklist(peer.addr, b.blockDuration); err != nil {
if peer.flagged && time.Now().After(peer.blockAfter) {
if err := b.disconnector.Blocklist(peer.addr, b.blockDuration, "blocker: flag timeout"); err != nil {
b.logger.Warningf("blocker: blocking peer %s failed: %v", peer.addr, err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Service interface {
}

type Disconnecter interface {
Disconnect(overlay swarm.Address) error
Disconnect(overlay swarm.Address, reason string) error
Blocklister
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/blocker"
"github.com/ethersphere/bee/pkg/discovery"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
Expand All @@ -36,6 +37,9 @@ const (
addPeerBatchSize = 500

peerConnectionAttemptTimeout = 5 * time.Second // Timeout for establishing a new connection with peer.

flagTimeout = 5 * time.Minute // how long before blocking a flagged peer
blockDuration = time.Hour // how long to blocklist an unresponsive peer for
)

var (
Expand Down Expand Up @@ -105,6 +109,7 @@ type Kad struct {
pruneFunc pruneFunc // pluggable prune function
pinger pingpong.Interface
staticPeer staticPeerFunc
blocker *blocker.Blocker
}

// New returns a new Kademlia.
Expand Down Expand Up @@ -160,6 +165,7 @@ func New(
pruneFunc: o.PruneFunc,
pinger: pinger,
staticPeer: isStaticPeer(o.StaticNodes),
blocker: blocker.New(p2p, flagTimeout, blockDuration, logger),
}

if k.pruneFunc == nil {
Expand Down Expand Up @@ -455,7 +461,7 @@ func (k *Kad) manage() {
return
case <-k.quit:
return
case <-time.After(1 * time.Second):
case <-time.After(5 * time.Second):
k.wg.Add(1)
go func() {
defer k.wg.Done()
Expand Down Expand Up @@ -538,8 +544,10 @@ func (k *Kad) recordPeerLatencies(ctx context.Context) {
l, err := k.pinger.Ping(ctx, addr, "ping")
if err != nil {
k.logger.Tracef("kademlia: cannot get latency for peer %s: %v", addr.String(), err)
k.blocker.Flag(addr)
return
}
k.blocker.Unflag(addr)
k.collector.Record(addr, im.PeerLatency(l))
v := k.collector.Inspect(addr).LatencyEWMA
k.metrics.PeerLatencyEWMA.Observe(v.Seconds())
Expand Down

0 comments on commit f90fc67

Please sign in to comment.