diff --git a/header/p2p/exchange.go b/header/p2p/exchange.go index f5c4a7b5df..d64f10d961 100644 --- a/header/p2p/exchange.go +++ b/header/p2p/exchange.go @@ -11,6 +11,7 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" + "github.com/libp2p/go-libp2p/p2p/net/conngater" tmbytes "github.com/tendermint/tendermint/libs/bytes" "github.com/celestiaorg/celestia-node/header" @@ -42,6 +43,7 @@ func NewExchange( host host.Host, peers peer.IDSlice, protocolSuffix string, + connGater *conngater.BasicConnectionGater, opts ...Option[ClientParameters], ) (*Exchange, error) { params := DefaultClientParameters() @@ -60,6 +62,7 @@ func NewExchange( trustedPeers: peers, peerTracker: newPeerTracker( host, + connGater, params.MaxAwaitingTime, params.DefaultScore, params.MaxPeerTrackerSize, diff --git a/header/p2p/exchange_test.go b/header/p2p/exchange_test.go index 1aa8887907..c809c6c21d 100644 --- a/header/p2p/exchange_test.go +++ b/header/p2p/exchange_test.go @@ -6,8 +6,11 @@ import ( "testing" "time" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/sync" libhost "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/p2p/net/conngater" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -82,8 +85,10 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) { totalAmount := 80 store := createStore(t, totalAmount) protocolSuffix := "private" + connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) + require.NoError(t, err) // create new exchange - exchange, err := NewExchange(hosts[len(hosts)-1], []peer.ID{}, protocolSuffix) + exchange, err := NewExchange(hosts[len(hosts)-1], []peer.ID{}, protocolSuffix, connGater) require.NoError(t, err) exchange.Params.MaxHeadersPerRequest = 10 exchange.ctx, exchange.cancel = context.WithCancel(context.Background()) @@ -288,8 +293,9 @@ func createP2PExAndServer(t *testing.T, host, tpeer libhost.Host) (header.Exchan require.NoError(t, err) err = serverSideEx.Start(context.Background()) require.NoError(t, err) - - ex, err := NewExchange(host, []peer.ID{tpeer.ID()}, "private") + connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) + require.NoError(t, err) + ex, err := NewExchange(host, []peer.ID{tpeer.ID()}, "private", connGater) require.NoError(t, err) ex.peerTracker.trackedPeers[tpeer.ID()] = &peerStat{peerID: tpeer.ID()} require.NoError(t, ex.Start(context.Background())) diff --git a/header/p2p/peer_tracker.go b/header/p2p/peer_tracker.go index 3d5aa9c4f9..66870f59a0 100644 --- a/header/p2p/peer_tracker.go +++ b/header/p2p/peer_tracker.go @@ -9,13 +9,15 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/p2p/net/conngater" ) // gcCycle defines the duration after which the peerTracker starts removing peers. var gcCycle = time.Minute * 30 type peerTracker struct { - host host.Host + host host.Host + connGater *conngater.BasicConnectionGater peerLk sync.RWMutex // trackedPeers contains active peers that we can request to. @@ -43,6 +45,7 @@ type peerTracker struct { func newPeerTracker( h host.Host, + connGater *conngater.BasicConnectionGater, maxAwaitingTime time.Duration, defaultScore float32, maxPeerTrackerSize int, @@ -50,6 +53,7 @@ func newPeerTracker( ctx, cancel := context.WithCancel(context.Background()) return &peerTracker{ host: h, + connGater: connGater, disconnectedPeers: make(map[peer.ID]*peerStat), trackedPeers: make(map[peer.ID]*peerStat), maxAwaitingTime: maxAwaitingTime, @@ -190,3 +194,25 @@ func (p *peerTracker) stop() { <-p.done } } + +// blockPeer blocks a peer on the networking level and removes it from the local cache. +func (p *peerTracker) blockPeer(pID peer.ID) { + // add peer to the blacklist, so we can't connect to it in the future. + err := p.connGater.BlockPeer(pID) + if err != nil { + log.Errorw("header/p2p: blocking peer failed", "err", err, "pID", pID) + } + // close connections to peer. + err = p.host.Network().ClosePeer(pID) + if err != nil { + log.Errorw("header/p2p: closing connection with peer failed", "err", err, "pID", pID) + } + + log.Warnw("header/p2p: blocked peer", "pID", pID) + + p.peerLk.Lock() + defer p.peerLk.Unlock() + // remove peer from cache. + delete(p.trackedPeers, pID) + delete(p.disconnectedPeers, pID) +} diff --git a/header/p2p/peer_tracker_test.go b/header/p2p/peer_tracker_test.go index 788ddbcf40..13791b0b16 100644 --- a/header/p2p/peer_tracker_test.go +++ b/header/p2p/peer_tracker_test.go @@ -4,7 +4,10 @@ import ( "testing" "time" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/p2p/net/conngater" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -12,7 +15,9 @@ import ( func TestPeerTracker_GC(t *testing.T) { h := createMocknet(t, 1) gcCycle = time.Millisecond * 200 - p := newPeerTracker(h[0], time.Millisecond*1, 1, 5) + connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) + require.NoError(t, err) + p := newPeerTracker(h[0], connGater, time.Millisecond*1, 1, 5) pid1 := peer.ID("peer1") pid2 := peer.ID("peer2") pid3 := peer.ID("peer3") @@ -31,3 +36,13 @@ func TestPeerTracker_GC(t *testing.T) { require.Nil(t, p.trackedPeers[pid1]) require.Nil(t, p.disconnectedPeers[pid3]) } + +func TestPeerTracker_BlockPeer(t *testing.T) { + h := createMocknet(t, 2) + connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) + require.NoError(t, err) + p := newPeerTracker(h[0], connGater, time.Millisecond*1, 1, 5) + p.blockPeer(h[1].ID()) + require.Len(t, connGater.ListBlockedPeers(), 1) + require.True(t, connGater.ListBlockedPeers()[0] == h[1].ID()) +} diff --git a/nodebuilder/header/constructors.go b/nodebuilder/header/constructors.go index 9c3713b2bc..cd616c0a29 100644 --- a/nodebuilder/header/constructors.go +++ b/nodebuilder/header/constructors.go @@ -7,6 +7,7 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/libp2p/go-libp2p/p2p/net/conngater" "go.uber.org/fx" "github.com/celestiaorg/celestia-node/header" @@ -32,6 +33,7 @@ func newP2PExchange(cfg Config) func( modp2p.Bootstrappers, modp2p.Network, host.Host, + *conngater.BasicConnectionGater, []p2p.Option[p2p.ClientParameters], ) (header.Exchange, error) { return func( @@ -39,6 +41,7 @@ func newP2PExchange(cfg Config) func( bpeers modp2p.Bootstrappers, network modp2p.Network, host host.Host, + conngater *conngater.BasicConnectionGater, opts []p2p.Option[p2p.ClientParameters], ) (header.Exchange, error) { peers, err := cfg.trustedPeers(bpeers) @@ -50,7 +53,7 @@ func newP2PExchange(cfg Config) func( ids[index] = peer.ID host.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL) } - exchange, err := p2p.NewExchange(host, ids, string(network), opts...) + exchange, err := p2p.NewExchange(host, ids, string(network), conngater, opts...) if err != nil { return nil, err } diff --git a/nodebuilder/header/module_test.go b/nodebuilder/header/module_test.go index 68b471f1d6..3dffc4b3ac 100644 --- a/nodebuilder/header/module_test.go +++ b/nodebuilder/header/module_test.go @@ -5,6 +5,7 @@ import ( "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/p2p/net/conngater" "github.com/stretchr/testify/require" "go.uber.org/fx" "go.uber.org/fx/fxtest" @@ -60,6 +61,9 @@ func TestConstructModule_ExchangeParams(t *testing.T) { return datastore.NewMapDatastore() }), ConstructModule(node.Light, &cfg), + fx.Provide(func(b datastore.Batching) (*conngater.BasicConnectionGater, error) { + return conngater.NewBasicConnectionGater(b) + }), fx.Invoke( func(e header.Exchange, server *p2p.ExchangeServer) { ex := e.(*p2p.Exchange)