Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: kademlia blocklists peer on ping timeout #2543

Merged
merged 8 commits into from
Sep 29, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions pkg/blocker/blocker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package blocker_test

import (
"io/ioutil"
"sync"
"testing"
"time"

"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/swarm/test"

"github.com/ethersphere/bee/pkg/blocker"
)

func TestBlocksAfterFlagTimeout(t *testing.T) {

defer func(t time.Duration) {
*blocker.WakeupTime = t
}(*blocker.WakeupTime)
*blocker.WakeupTime = time.Millisecond

mux := sync.Mutex{}
blocked := make(map[string]time.Duration)

mock := mockBlockLister(func(a swarm.Address, d time.Duration, r string) error {
mux.Lock()
blocked[a.ByteString()] = d
mux.Unlock()

return nil
})

logger := logging.New(ioutil.Discard, 0)

flagTime := time.Millisecond * 25
checkTime := time.Millisecond * 100
blockTime := time.Second

b := blocker.New(mock, flagTime, blockTime, logger)
defer b.Close()
// wait for worker loop to start
time.Sleep(time.Millisecond * 50)

addr := test.RandomAddress()
b.Flag(addr)

if _, ok := blocked[addr.ByteString()]; ok {
t.Fatal("blocker did not wait flag duration")
}

midway := time.After(flagTime / 2)
check := time.After(checkTime)

<-midway
b.Flag(addr) // check thats this flag call does not overide previous call
<-check

mux.Lock()
blockedTime, ok := blocked[addr.ByteString()]
mux.Unlock()
if !ok {
t.Fatal("address should be blocked")
}

if blockedTime != blockTime {
t.Fatalf("block time: want %v, got %v", blockTime, blockedTime)
}
}

func TestUnflagBeforeBlock(t *testing.T) {

defer func(t time.Duration) {
*blocker.WakeupTime = t
}(*blocker.WakeupTime)
*blocker.WakeupTime = time.Millisecond

mux := sync.Mutex{}
blocked := make(map[string]time.Duration)

mock := mockBlockLister(func(a swarm.Address, d time.Duration, r string) error {
mux.Lock()
blocked[a.ByteString()] = d
mux.Unlock()
return nil
})

logger := logging.New(ioutil.Discard, 0)

flagTime := time.Millisecond * 25
checkTime := time.Millisecond * 100
blockTime := time.Second

b := blocker.New(mock, flagTime, blockTime, logger)
defer b.Close()
// wait for worker loop to start
time.Sleep(time.Millisecond * 50)

addr := test.RandomAddress()
b.Flag(addr)

if _, ok := blocked[addr.ByteString()]; ok {
t.Fatal("blocker did not wait flag duration")
}

b.Unflag(addr)

time.Sleep(checkTime)

mux.Lock()
_, ok := blocked[addr.ByteString()]
mux.Unlock()

if ok {
t.Fatal("address should not be blocked")
}
}

type blocklister struct {
blocklistFunc func(swarm.Address, time.Duration, string) error
}

func mockBlockLister(f func(swarm.Address, time.Duration, string) error) *blocklister {
return &blocklister{
blocklistFunc: f,
}
}

func (b *blocklister) Blocklist(addr swarm.Address, t time.Duration, r string) error {
return b.blocklistFunc(addr, t, r)
}
109 changes: 109 additions & 0 deletions pkg/blocker/blockler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package blocker

import (
"sync"
"time"

"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
"github.com/ethersphere/bee/pkg/swarm"
)

type peer struct {
blockAfter time.Time // timestamp of the point we've timed-out or got an error from a peer
addr swarm.Address
}

var (
wakeupTime time.Duration = time.Second * 10
)

type Blocker struct {
mux sync.Mutex
disconnector p2p.Blocklister
flagTimeout time.Duration // how long before blocking a flagged peer
blockDuration time.Duration // how long to blocklist a bad peer
peers map[string]*peer
logger logging.Logger
wakeupCh chan struct{}
quit chan struct{}
}

func New(dis p2p.Blocklister, flagTimeout, blockDuration time.Duration, logger logging.Logger) *Blocker {

b := &Blocker{
disconnector: dis,
flagTimeout: flagTimeout,
blockDuration: blockDuration,
peers: map[string]*peer{},
wakeupCh: make(chan struct{}),
quit: make(chan struct{}),
logger: logger,
}

go b.run()

return b
}

func (b *Blocker) run() {

for {
select {
case <-b.quit:
return
case <-time.After(wakeupTime):
b.block()
}
}
}

func (b *Blocker) block() {
b.mux.Lock()
defer b.mux.Unlock()

for key, peer := range b.peers {
if !peer.blockAfter.IsZero() && time.Now().After(peer.blockAfter) {
if err := b.disconnector.Blocklist(peer.addr, b.blockDuration, "blocker: flag timeout"); err != nil {
b.logger.Warningf("blocker: blocking peer %s failed: %v", peer.addr, err)
}
delete(b.peers, key)
}
}
}

func (b *Blocker) Flag(addr swarm.Address) {
b.mux.Lock()
defer b.mux.Unlock()

p, ok := b.peers[addr.ByteString()]

if ok {
if p.blockAfter.IsZero() {
p.blockAfter = time.Now().Add(b.flagTimeout)
}
} else {
b.peers[addr.ByteString()] = &peer{
blockAfter: time.Now().Add(b.flagTimeout),
addr: addr,
}
}
}

func (b *Blocker) Unflag(addr swarm.Address) {
b.mux.Lock()
defer b.mux.Unlock()

delete(b.peers, addr.ByteString())
}

// Closed will exit the worker loop.
// must be called only once.
func (b *Blocker) Close() error {
b.quit <- struct{}{}
return nil
}
3 changes: 3 additions & 0 deletions pkg/blocker/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package blocker

var WakeupTime = &wakeupTime
6 changes: 5 additions & 1 deletion pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ type Service interface {

type Disconnecter interface {
Disconnect(overlay swarm.Address, reason string) error
Blocklister
}

type Blocklister interface {
// Blocklist will disconnect a peer and put it on a blocklist (blocking in & out connections) for provided duration
// duration 0 is treated as an infinite duration
// Duration 0 is treated as an infinite duration.
Blocklist(overlay swarm.Address, duration time.Duration, reason string) error
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/ethersphere/bee/pkg/addressbook"
"github.com/ethersphere/bee/pkg/blocker"
"github.com/ethersphere/bee/pkg/discovery"
"github.com/ethersphere/bee/pkg/logging"
"github.com/ethersphere/bee/pkg/p2p"
Expand All @@ -36,6 +37,9 @@ const (
addPeerBatchSize = 500

peerConnectionAttemptTimeout = 5 * time.Second // Timeout for establishing a new connection with peer.

flagTimeout = 5 * time.Minute // how long before blocking a flagged peer
blockDuration = time.Hour // how long to blocklist an unresponsive peer for
)

var (
Expand Down Expand Up @@ -108,6 +112,7 @@ type Kad struct {
bgBroadcastCtx context.Context
bgBroadcastCancel context.CancelFunc
bgBroadcastWg sync.WaitGroup
blocker *blocker.Blocker
}

// New returns a new Kademlia.
Expand Down Expand Up @@ -162,6 +167,7 @@ func New(
pruneFunc: o.PruneFunc,
pinger: pinger,
staticPeer: isStaticPeer(o.StaticNodes),
blocker: blocker.New(p2p, flagTimeout, blockDuration, logger),
}

if k.pruneFunc == nil {
Expand Down Expand Up @@ -542,8 +548,10 @@ func (k *Kad) recordPeerLatencies(ctx context.Context) {
l, err := k.pinger.Ping(ctx, addr, "ping")
if err != nil {
k.logger.Tracef("kademlia: cannot get latency for peer %s: %v", addr.String(), err)
k.blocker.Flag(addr)
return
}
k.blocker.Unflag(addr)
k.collector.Record(addr, im.PeerLatency(l))
v := k.collector.Inspect(addr).LatencyEWMA
k.metrics.PeerLatencyEWMA.Observe(v.Seconds())
Expand Down Expand Up @@ -1357,6 +1365,7 @@ func (k *Kad) Halt() {
func (k *Kad) Close() error {
k.logger.Info("kademlia shutting down")
close(k.quit)
_ = k.blocker.Close()
cc := make(chan struct{})

k.bgBroadcastCancel()
Expand Down