Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: kademlia blocklists peer on ping timeout #2543

Merged
merged 8 commits into from
Sep 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions pkg/blocker/blocker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package blocker

import (
"sync"
"time"

"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm"
)

type peer struct {
blockAfter time.Time // timestamp of the point we've timed-out or got an error from a peer
addr swarm.Address
}

type Blocker struct {
mux sync.Mutex
disconnector p2p.Blocklister
flagTimeout time.Duration // how long before blocking a flagged peer
blockDuration time.Duration // how long to blocklist a bad peer
workerWakeup time.Duration // how long to blocklist a bad peer
peers map[string]*peer
logger logging.Logger
wakeupCh chan struct{}
quit chan struct{}
closeWg sync.WaitGroup
}

func New(dis p2p.Blocklister, flagTimeout, blockDuration, wakeUpTime time.Duration, logger logging.Logger) *Blocker {

b := &Blocker{
disconnector: dis,
flagTimeout: flagTimeout,
blockDuration: blockDuration,
workerWakeup: wakeUpTime,
peers: map[string]*peer{},
wakeupCh: make(chan struct{}),
quit: make(chan struct{}),
logger: logger,
closeWg: sync.WaitGroup{},
}

b.closeWg.Add(1)
go b.run()

return b
}

func (b *Blocker) run() {
defer b.closeWg.Done()
for {
select {
case <-b.quit:
return
case <-time.After(b.workerWakeup):
b.block()
}
}
}

func (b *Blocker) block() {
b.mux.Lock()
defer b.mux.Unlock()

for key, peer := range b.peers {

select {
case <-b.quit:
return
default:
}

if !peer.blockAfter.IsZero() && time.Now().After(peer.blockAfter) {
acud marked this conversation as resolved.
Show resolved Hide resolved
if err := b.disconnector.Blocklist(peer.addr, b.blockDuration, "blocker: flag timeout"); err != nil {
acud marked this conversation as resolved.
Show resolved Hide resolved
b.logger.Warningf("blocker: blocking peer %s failed: %v", peer.addr, err)
}
delete(b.peers, key)
}
}
}

func (b *Blocker) Flag(addr swarm.Address) {
b.mux.Lock()
defer b.mux.Unlock()

p, ok := b.peers[addr.ByteString()]

if ok {
if p.blockAfter.IsZero() {
p.blockAfter = time.Now().Add(b.flagTimeout)
}
} else {
b.peers[addr.ByteString()] = &peer{
blockAfter: time.Now().Add(b.flagTimeout),
addr: addr,
}
}
}

func (b *Blocker) Unflag(addr swarm.Address) {
b.mux.Lock()
defer b.mux.Unlock()

delete(b.peers, addr.ByteString())
}

// Close will exit the worker loop.
// must be called only once.
func (b *Blocker) Close() error {
close(b.quit)
b.closeWg.Wait()
return nil
}
123 changes: 123 additions & 0 deletions pkg/blocker/blocker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package blocker_test

import (
"io/ioutil"
"sync"
"testing"
"time"

"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/swarm/test"

"github.com/ethersphere/bee/pkg/blocker"
)

func TestBlocksAfterFlagTimeout(t *testing.T) {

mux := sync.Mutex{}
blocked := make(map[string]time.Duration)

mock := mockBlockLister(func(a swarm.Address, d time.Duration, r string) error {
mux.Lock()
blocked[a.ByteString()] = d
mux.Unlock()

return nil
})

logger := logging.New(ioutil.Discard, 0)

flagTime := time.Millisecond * 25
checkTime := time.Millisecond * 100
blockTime := time.Second

b := blocker.New(mock, flagTime, blockTime, time.Millisecond, logger)

addr := test.RandomAddress()
b.Flag(addr)

if _, ok := blocked[addr.ByteString()]; ok {
t.Fatal("blocker did not wait flag duration")
}

midway := time.After(flagTime / 2)
check := time.After(checkTime)

<-midway
b.Flag(addr) // check thats this flag call does not overide previous call
<-check

mux.Lock()
blockedTime, ok := blocked[addr.ByteString()]
mux.Unlock()
if !ok {
t.Fatal("address should be blocked")
}

if blockedTime != blockTime {
t.Fatalf("block time: want %v, got %v", blockTime, blockedTime)
}

b.Close()
}

func TestUnflagBeforeBlock(t *testing.T) {

mux := sync.Mutex{}
blocked := make(map[string]time.Duration)

mock := mockBlockLister(func(a swarm.Address, d time.Duration, r string) error {
mux.Lock()
blocked[a.ByteString()] = d
mux.Unlock()
return nil
})

logger := logging.New(ioutil.Discard, 0)

flagTime := time.Millisecond * 25
checkTime := time.Millisecond * 100
blockTime := time.Second

b := blocker.New(mock, flagTime, blockTime, time.Millisecond, logger)

addr := test.RandomAddress()
b.Flag(addr)

if _, ok := blocked[addr.ByteString()]; ok {
t.Fatal("blocker did not wait flag duration")
}

b.Unflag(addr)

time.Sleep(checkTime)

mux.Lock()
_, ok := blocked[addr.ByteString()]
mux.Unlock()

if ok {
t.Fatal("address should not be blocked")
}

b.Close()
}

type blocklister struct {
blocklistFunc func(swarm.Address, time.Duration, string) error
}

func mockBlockLister(f func(swarm.Address, time.Duration, string) error) *blocklister {
return &blocklister{
blocklistFunc: f,
}
}

func (b *blocklister) Blocklist(addr swarm.Address, t time.Duration, r string) error {
return b.blocklistFunc(addr, t, r)
}
6 changes: 5 additions & 1 deletion pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ type Service interface {

type Disconnecter interface {
Disconnect(overlay swarm.Address, reason string) error
Blocklister
}

type Blocklister interface {
// Blocklist will disconnect a peer and put it on a blocklist (blocking in & out connections) for provided duration
// duration 0 is treated as an infinite duration
// Duration 0 is treated as an infinite duration.
Blocklist(overlay swarm.Address, duration time.Duration, reason string) error
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/blocker"
"github.com/ethersphere/bee/pkg/discovery"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
Expand All @@ -36,6 +37,10 @@ const (
addPeerBatchSize = 500

peerConnectionAttemptTimeout = 5 * time.Second // Timeout for establishing a new connection with peer.

flagTimeout = 5 * time.Minute // how long before blocking a flagged peer
blockDuration = time.Hour // how long to blocklist an unresponsive peer for
blockWorkerWakup = time.Second * 10 // wake up interval for the blocker worker
)

var (
Expand Down Expand Up @@ -108,6 +113,7 @@ type Kad struct {
bgBroadcastCtx context.Context
bgBroadcastCancel context.CancelFunc
bgBroadcastWg sync.WaitGroup
blocker *blocker.Blocker
}

// New returns a new Kademlia.
Expand Down Expand Up @@ -162,6 +168,7 @@ func New(
pruneFunc: o.PruneFunc,
pinger: pinger,
staticPeer: isStaticPeer(o.StaticNodes),
blocker: blocker.New(p2p, flagTimeout, blockDuration, blockWorkerWakup, logger),
}

if k.pruneFunc == nil {
Expand Down Expand Up @@ -542,8 +549,10 @@ func (k *Kad) recordPeerLatencies(ctx context.Context) {
l, err := k.pinger.Ping(ctx, addr, "ping")
if err != nil {
k.logger.Tracef("kademlia: cannot get latency for peer %s: %v", addr.String(), err)
k.blocker.Flag(addr)
return
}
k.blocker.Unflag(addr)
k.collector.Record(addr, im.PeerLatency(l))
v := k.collector.Inspect(addr).LatencyEWMA
k.metrics.PeerLatencyEWMA.Observe(v.Seconds())
Expand Down Expand Up @@ -1357,6 +1366,7 @@ func (k *Kad) Halt() {
func (k *Kad) Close() error {
k.logger.Info("kademlia shutting down")
close(k.quit)
_ = k.blocker.Close()
cc := make(chan struct{})

k.bgBroadcastCancel()
Expand Down