Skip to content

Commit

Permalink
feat(header|p2p): add blacklisting to peerTracker (#1532)
Browse files Browse the repository at this point in the history
## Overview
Resolves #1531

## Checklist

- [x] New and updated code has appropriate documentation
- [x] New and updated code has new and/or updated testing
- [x] Required CI checks are passing
- [x] Visual proof for any user facing features like CLI or
documentation updates
- [x] Linked issues closed with keywords
  • Loading branch information
vgonkivs authored Dec 22, 2022
1 parent 7c05ade commit 6038341
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 6 deletions.
3 changes: 3 additions & 0 deletions header/p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
tmbytes "github.com/tendermint/tendermint/libs/bytes"

"github.com/celestiaorg/celestia-node/header"
Expand Down Expand Up @@ -42,6 +43,7 @@ func NewExchange(
host host.Host,
peers peer.IDSlice,
protocolSuffix string,
connGater *conngater.BasicConnectionGater,
opts ...Option[ClientParameters],
) (*Exchange, error) {
params := DefaultClientParameters()
Expand All @@ -60,6 +62,7 @@ func NewExchange(
trustedPeers: peers,
peerTracker: newPeerTracker(
host,
connGater,
params.MaxAwaitingTime,
params.DefaultScore,
params.MaxPeerTrackerSize,
Expand Down
12 changes: 9 additions & 3 deletions header/p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"testing"
"time"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
libhost "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -82,8 +85,10 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) {
totalAmount := 80
store := createStore(t, totalAmount)
protocolSuffix := "private"
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
require.NoError(t, err)
// create new exchange
exchange, err := NewExchange(hosts[len(hosts)-1], []peer.ID{}, protocolSuffix)
exchange, err := NewExchange(hosts[len(hosts)-1], []peer.ID{}, protocolSuffix, connGater)
require.NoError(t, err)
exchange.Params.MaxHeadersPerRequest = 10
exchange.ctx, exchange.cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -288,8 +293,9 @@ func createP2PExAndServer(t *testing.T, host, tpeer libhost.Host) (header.Exchan
require.NoError(t, err)
err = serverSideEx.Start(context.Background())
require.NoError(t, err)

ex, err := NewExchange(host, []peer.ID{tpeer.ID()}, "private")
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
require.NoError(t, err)
ex, err := NewExchange(host, []peer.ID{tpeer.ID()}, "private", connGater)
require.NoError(t, err)
ex.peerTracker.trackedPeers[tpeer.ID()] = &peerStat{peerID: tpeer.ID()}
require.NoError(t, ex.Start(context.Background()))
Expand Down
28 changes: 27 additions & 1 deletion header/p2p/peer_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
)

// gcCycle defines the duration after which the peerTracker starts removing peers.
var gcCycle = time.Minute * 30

type peerTracker struct {
host host.Host
host host.Host
connGater *conngater.BasicConnectionGater

peerLk sync.RWMutex
// trackedPeers contains active peers that we can request to.
Expand Down Expand Up @@ -43,13 +45,15 @@ type peerTracker struct {

func newPeerTracker(
h host.Host,
connGater *conngater.BasicConnectionGater,
maxAwaitingTime time.Duration,
defaultScore float32,
maxPeerTrackerSize int,
) *peerTracker {
ctx, cancel := context.WithCancel(context.Background())
return &peerTracker{
host: h,
connGater: connGater,
disconnectedPeers: make(map[peer.ID]*peerStat),
trackedPeers: make(map[peer.ID]*peerStat),
maxAwaitingTime: maxAwaitingTime,
Expand Down Expand Up @@ -190,3 +194,25 @@ func (p *peerTracker) stop() {
<-p.done
}
}

// blockPeer blocks a peer on the networking level and removes it from the local cache.
func (p *peerTracker) blockPeer(pID peer.ID) {
// add peer to the blacklist, so we can't connect to it in the future.
err := p.connGater.BlockPeer(pID)
if err != nil {
log.Errorw("header/p2p: blocking peer failed", "err", err, "pID", pID)
}
// close connections to peer.
err = p.host.Network().ClosePeer(pID)
if err != nil {
log.Errorw("header/p2p: closing connection with peer failed", "err", err, "pID", pID)
}

log.Warnw("header/p2p: blocked peer", "pID", pID)

p.peerLk.Lock()
defer p.peerLk.Unlock()
// remove peer from cache.
delete(p.trackedPeers, pID)
delete(p.disconnectedPeers, pID)
}
17 changes: 16 additions & 1 deletion header/p2p/peer_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@ import (
"testing"
"time"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPeerTracker_GC(t *testing.T) {
h := createMocknet(t, 1)
gcCycle = time.Millisecond * 200
p := newPeerTracker(h[0], time.Millisecond*1, 1, 5)
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
require.NoError(t, err)
p := newPeerTracker(h[0], connGater, time.Millisecond*1, 1, 5)
pid1 := peer.ID("peer1")
pid2 := peer.ID("peer2")
pid3 := peer.ID("peer3")
Expand All @@ -31,3 +36,13 @@ func TestPeerTracker_GC(t *testing.T) {
require.Nil(t, p.trackedPeers[pid1])
require.Nil(t, p.disconnectedPeers[pid3])
}

func TestPeerTracker_BlockPeer(t *testing.T) {
h := createMocknet(t, 2)
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
require.NoError(t, err)
p := newPeerTracker(h[0], connGater, time.Millisecond*1, 1, 5)
p.blockPeer(h[1].ID())
require.Len(t, connGater.ListBlockedPeers(), 1)
require.True(t, connGater.ListBlockedPeers()[0] == h[1].ID())
}
5 changes: 4 additions & 1 deletion nodebuilder/header/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/header"
Expand All @@ -32,13 +33,15 @@ func newP2PExchange(cfg Config) func(
modp2p.Bootstrappers,
modp2p.Network,
host.Host,
*conngater.BasicConnectionGater,
[]p2p.Option[p2p.ClientParameters],
) (header.Exchange, error) {
return func(
lc fx.Lifecycle,
bpeers modp2p.Bootstrappers,
network modp2p.Network,
host host.Host,
conngater *conngater.BasicConnectionGater,
opts []p2p.Option[p2p.ClientParameters],
) (header.Exchange, error) {
peers, err := cfg.trustedPeers(bpeers)
Expand All @@ -50,7 +53,7 @@ func newP2PExchange(cfg Config) func(
ids[index] = peer.ID
host.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
}
exchange, err := p2p.NewExchange(host, ids, string(network), opts...)
exchange, err := p2p.NewExchange(host, ids, string(network), conngater, opts...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions nodebuilder/header/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"github.com/stretchr/testify/require"
"go.uber.org/fx"
"go.uber.org/fx/fxtest"
Expand Down Expand Up @@ -60,6 +61,9 @@ func TestConstructModule_ExchangeParams(t *testing.T) {
return datastore.NewMapDatastore()
}),
ConstructModule(node.Light, &cfg),
fx.Provide(func(b datastore.Batching) (*conngater.BasicConnectionGater, error) {
return conngater.NewBasicConnectionGater(b)
}),
fx.Invoke(
func(e header.Exchange, server *p2p.ExchangeServer) {
ex := e.(*p2p.Exchange)
Expand Down

0 comments on commit 6038341

Please sign in to comment.