Skip to content

Commit

Permalink
feat(header/p2p): implement GetVerifiedRangeByHeight
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Nov 1, 2022
1 parent 4e913e5 commit 53d9e8b
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 13 deletions.
1 change: 1 addition & 0 deletions header/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Broadcaster interface {
// from the network.
type Exchange interface {
Getter
GetVerifiedRangeByHeight(context.Context, *ExtendedHeader, uint64) ([]*ExtendedHeader, error)
}

var (
Expand Down
5 changes: 5 additions & 0 deletions header/local/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func (l *Exchange) GetRangeByHeight(ctx context.Context, origin, amount uint64)
return l.store.GetRangeByHeight(ctx, origin, origin+amount)
}

func (l *Exchange) GetVerifiedRangeByHeight(ctx context.Context, origin *header.ExtendedHeader, amount uint64,
) ([]*header.ExtendedHeader, error) {
return l.GetRangeByHeight(ctx, uint64(origin.Height+1), uint64(origin.Height)+amount)
}

func (l *Exchange) Get(ctx context.Context, hash bytes.HexBytes) (*header.ExtendedHeader, error) {
return l.store.Get(ctx, hash)
}
41 changes: 39 additions & 2 deletions header/p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package p2p
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"sort"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
tmbytes "github.com/tendermint/tendermint/libs/bytes"

"github.com/celestiaorg/go-libp2p-messenger/serde"
Expand Down Expand Up @@ -54,12 +56,17 @@ func protocolID(protocolSuffix string) protocol.ID {
return protocol.ID(fmt.Sprintf("/header-ex/v0.0.3/%s", protocolSuffix))
}

func NewExchange(host host.Host, peers peer.IDSlice, protocolSuffix string) *Exchange {
func NewExchange(
host host.Host,
connGater *conngater.BasicConnectionGater,
peers peer.IDSlice,
protocolSuffix string,
) *Exchange {
return &Exchange{
host: host,
protocolID: protocolID(protocolSuffix),
trustedPeers: peers,
peerTracker: newPeerTracker(host),
peerTracker: newPeerTracker(host, connGater),
}
}

Expand Down Expand Up @@ -145,6 +152,36 @@ func (ex *Exchange) GetRangeByHeight(ctx context.Context, from, amount uint64) (
return session.getRangeByHeight(ctx, from, amount)
}

// GetVerifiedRangeByHeight performs a request for the given range of ExtendedHeaders to the network and ensures
// that returned headers are correct against the passed one.
func (ex *Exchange) GetVerifiedRangeByHeight(
ctx context.Context,
from *header.ExtendedHeader,
amount uint64,
) ([]*header.ExtendedHeader, error) {
session := newSession(ex.ctx, ex.host, ex.peerTracker.peers(), ex.protocolID)
headers, err := session.getRangeByHeight(ctx, uint64(from.Height+1), amount)
if err != nil {
return nil, err
}
fromHeight := from.Height + 1
for _, h := range headers {
if fromHeight != h.Height {
pid := session.peerByHeaderHeight(h.Height)
ex.peerTracker.blockPeer(pid)
return nil, errors.New("header/p2p: returned headers are not contiguous")
}
fromHeight++
err := from.VerifyNonAdjacent(h)
if err != nil {
pid := session.peerByHeaderHeight(h.Height)
ex.peerTracker.blockPeer(pid)
return nil, err
}
}
return headers, nil
}

// Get performs a request for the ExtendedHeader by the given hash corresponding
// to the RawHeader. Note that the ExtendedHeader must be verified thereafter.
func (ex *Exchange) Get(ctx context.Context, hash tmbytes.HexBytes) (*header.ExtendedHeader, error) {
Expand Down
4 changes: 2 additions & 2 deletions header/p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) {
hosts := createMocknet(t, 9)
store := createStore(t, int(maxRequestSize))
protocolSuffix := "private"
exchange := NewExchange(hosts[len(hosts)-1], []peer.ID{}, protocolSuffix)
exchange := NewExchange(hosts[len(hosts)-1], nil, []peer.ID{}, protocolSuffix)
exchange.ctx, exchange.cancel = context.WithCancel(context.Background())
t.Cleanup(exchange.cancel)
servers := make([]*ExchangeServer, len(hosts)-1) // amount of servers is len(hosts)-1 because one peer acts as a client
Expand Down Expand Up @@ -257,7 +257,7 @@ func createP2PExAndServer(t *testing.T, host, tpeer libhost.Host) (header.Exchan
err := serverSideEx.Start(context.Background())
require.NoError(t, err)

exchange := NewExchange(host, []peer.ID{tpeer.ID()}, "private")
exchange := NewExchange(host, nil, []peer.ID{tpeer.ID()}, "private")
exchange.peerTracker.connectedPeers[tpeer.ID()] = &peerStat{peerID: tpeer.ID()}
exchange.Start(context.Background()) //nolint:errcheck

Expand Down
31 changes: 29 additions & 2 deletions header/p2p/peer_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
)

type peerTracker struct {
Expand All @@ -17,14 +18,16 @@ type peerTracker struct {
// so we can guarantee that peerQueue will return only active peer
disconnectedPeers map[peer.ID]*peerStat

host host.Host
host host.Host
connGater *conngater.BasicConnectionGater
}

func newPeerTracker(h host.Host) *peerTracker {
func newPeerTracker(h host.Host, c *conngater.BasicConnectionGater) *peerTracker {
return &peerTracker{
disconnectedPeers: make(map[peer.ID]*peerStat),
connectedPeers: make(map[peer.ID]*peerStat),
host: h,
connGater: c,
}
}

Expand Down Expand Up @@ -99,3 +102,27 @@ func (p *peerTracker) peers() []*peerStat {
}
return peers
}

// blockPeer removes peer from cache and blocks peer on the networking level.
func (p *peerTracker) blockPeer(pID peer.ID) {
p.Lock()
defer p.Unlock()

for _, pid := range p.connectedPeers {
if pid.peerID == pID {
delete(p.connectedPeers, pID)
return
}
}

for _, pid := range p.disconnectedPeers {
if pid.peerID == pID {
delete(p.disconnectedPeers, pID)
return
}
}

if err := p.connGater.BlockPeer(pID); err != nil {
log.Errorw("blocking peer err", "blockedPid", pID, "err", err)
}
}
42 changes: 36 additions & 6 deletions header/p2p/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"io"
"sort"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/host"
Expand All @@ -22,6 +23,12 @@ const (
headersPerPeer uint64 = 64
)

// requestStat contains a range [from;to] that was requested from the peer.
type requestStat struct {
from int64
to int64
}

// session aims to divide a range of headers
// into several smaller requests among different peers.
type session struct {
Expand All @@ -33,15 +40,20 @@ type session struct {

reqCh chan *p2p_pb.ExtendedHeaderRequest
errCh chan error

cacheLk sync.Mutex
// requestsCache used to store ranges that will be requested from the peer.
requestsCache map[peer.ID][]requestStat
}

func newSession(ctx context.Context, h host.Host, peerTracker []*peerStat, protocolID protocol.ID) *session {
return &session{
ctx: ctx,
protocolID: protocolID,
host: h,
queue: newPeerQueue(peerTracker),
errCh: make(chan error),
ctx: ctx,
protocolID: protocolID,
host: h,
queue: newPeerQueue(peerTracker),
errCh: make(chan error),
requestsCache: make(map[peer.ID][]requestStat),
}
}

Expand All @@ -68,6 +80,9 @@ func (s *session) doRequest(
return
}
log.Debugw("request headers from peer succeed", "from", s.host.ID(), "pID", stat.peerID, "amount", req.Amount)
s.cacheLk.Lock()
s.requestsCache[stat.peerID] = append(s.requestsCache[stat.peerID], requestStat{h[0].Height, h[len(h)-1].Height})
s.cacheLk.Unlock()
// send headers to the channel, update peer stats and return peer to the queue, so it can be re-used in case
// if there are other requests awaiting
headers <- h
Expand Down Expand Up @@ -185,10 +200,25 @@ func (s *session) processResponse(responses []*p2p_pb.ExtendedHeaderResponse) ([
}
headers = append(headers, header)
}

if len(headers) == 0 {
return nil, header.ErrNotFound
}
return headers, nil
}

func (s *session) peerByHeaderHeight(height int64) peer.ID {
s.cacheLk.Lock()
defer s.cacheLk.Lock()
for pID, stat := range s.requestsCache {
for _, req := range stat {
if req.from >= height && req.to <= height {
return pID
}
}
}
panic("could not found in range")
}

// prepareRequests converts incoming range into separate ExtendedHeaderRequest.
func prepareRequests(from, amount, headersPerPeer uint64) []*p2p_pb.ExtendedHeaderRequest {
requests := make([]*p2p_pb.ExtendedHeaderRequest, 0)
Expand Down
5 changes: 5 additions & 0 deletions header/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,8 @@ func (e *exchangeCountingHead) GetByHeight(ctx context.Context, u uint64) (*head
func (e *exchangeCountingHead) GetRangeByHeight(c context.Context, from, to uint64) ([]*header.ExtendedHeader, error) {
panic("implement me")
}

func (e *exchangeCountingHead) GetVerifiedRangeByHeight(c context.Context, from *header.ExtendedHeader, to uint64,
) ([]*header.ExtendedHeader, error) {
panic("implement me")
}
5 changes: 4 additions & 1 deletion nodebuilder/header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/header"
Expand All @@ -27,12 +28,14 @@ func newP2PExchange(cfg Config) func(
modp2p.Bootstrappers,
modp2p.Network,
host.Host,
*conngater.BasicConnectionGater,
) (header.Exchange, error) {
return func(
lc fx.Lifecycle,
bpeers modp2p.Bootstrappers,
network modp2p.Network,
host host.Host,
connGater *conngater.BasicConnectionGater,
) (header.Exchange, error) {
peers, err := cfg.trustedPeers(bpeers)
if err != nil {
Expand All @@ -43,7 +46,7 @@ func newP2PExchange(cfg Config) func(
ids[index] = peer.ID
host.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
}
exchange := p2p.NewExchange(host, ids, string(network))
exchange := p2p.NewExchange(host, connGater, ids, string(network))
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return exchange.Start(ctx)
Expand Down

0 comments on commit 53d9e8b

Please sign in to comment.