From 898cb101242618452e391de9e06e8f95b76e9916 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Mon, 28 Nov 2022 17:49:55 +0200 Subject: [PATCH 1/8] feat(header/p2p): implement GetVerifiedRangeByHeight --- header/interface.go | 1 + header/local/exchange.go | 5 ++++ header/p2p/exchange.go | 42 ++++++++++++++++++++++++++++-- header/p2p/exchange_test.go | 4 +-- header/p2p/peer_tracker.go | 31 ++++++++++++++++++++-- header/p2p/session.go | 31 +++++++++++++++++++++- header/sync/sync_test.go | 5 ++++ nodebuilder/header/constructors.go | 5 +++- 8 files changed, 116 insertions(+), 8 deletions(-) diff --git a/header/interface.go b/header/interface.go index 1273dea103..7a9dde0c13 100644 --- a/header/interface.go +++ b/header/interface.go @@ -58,6 +58,7 @@ type Broadcaster interface { // from the network. type Exchange interface { Getter + GetVerifiedRangeByHeight(context.Context, *ExtendedHeader, uint64) ([]*ExtendedHeader, error) } var ( diff --git a/header/local/exchange.go b/header/local/exchange.go index 9b284930a4..5c024ddc0e 100644 --- a/header/local/exchange.go +++ b/header/local/exchange.go @@ -43,6 +43,11 @@ func (l *Exchange) GetRangeByHeight(ctx context.Context, origin, amount uint64) return l.store.GetRangeByHeight(ctx, origin, origin+amount) } +func (l *Exchange) GetVerifiedRangeByHeight(ctx context.Context, origin *header.ExtendedHeader, amount uint64, +) ([]*header.ExtendedHeader, error) { + return l.GetRangeByHeight(ctx, uint64(origin.Height+1), uint64(origin.Height)+amount) +} + func (l *Exchange) Get(ctx context.Context, hash bytes.HexBytes) (*header.ExtendedHeader, error) { return l.store.Get(ctx, hash) } diff --git a/header/p2p/exchange.go b/header/p2p/exchange.go index 721fc1500d..9a5f27d201 100644 --- a/header/p2p/exchange.go +++ b/header/p2p/exchange.go @@ -3,6 +3,7 @@ package p2p import ( "bytes" "context" + "errors" "fmt" "math/rand" "sort" @@ -12,6 +13,7 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" + "github.com/libp2p/go-libp2p/p2p/net/conngater" tmbytes "github.com/tendermint/tendermint/libs/bytes" "github.com/celestiaorg/go-libp2p-messenger/serde" @@ -54,12 +56,17 @@ func protocolID(protocolSuffix string) protocol.ID { return protocol.ID(fmt.Sprintf("/header-ex/v0.0.3/%s", protocolSuffix)) } -func NewExchange(host host.Host, peers peer.IDSlice, protocolSuffix string) *Exchange { +func NewExchange( + host host.Host, + connGater *conngater.BasicConnectionGater, + peers peer.IDSlice, + protocolSuffix string, +) *Exchange { return &Exchange{ host: host, protocolID: protocolID(protocolSuffix), trustedPeers: peers, - peerTracker: newPeerTracker(host), + peerTracker: newPeerTracker(host, connGater), } } @@ -151,6 +158,37 @@ func (ex *Exchange) GetRangeByHeight(ctx context.Context, from, amount uint64) ( return session.getRangeByHeight(ctx, from, amount) } +// GetVerifiedRangeByHeight performs a request for the given range of ExtendedHeaders to the network and ensures +// that returned headers are correct against the passed one. +func (ex *Exchange) GetVerifiedRangeByHeight( + ctx context.Context, + from *header.ExtendedHeader, + amount uint64, +) ([]*header.ExtendedHeader, error) { + session := newSession(ex.ctx, ex.host, ex.peerTracker.peers(), ex.protocolID) + defer session.close() + headers, err := session.getRangeByHeight(ctx, uint64(from.Height+1), amount) + if err != nil { + return nil, err + } + fromHeight := from.Height + 1 + for _, h := range headers { + if fromHeight != h.Height { + pid := session.peerByHeaderHeight(h.Height) + ex.peerTracker.blockPeer(pid) + return nil, errors.New("header/p2p: returned headers are not contiguous") + } + fromHeight++ + err := from.VerifyNonAdjacent(h) + if err != nil { + pid := session.peerByHeaderHeight(h.Height) + ex.peerTracker.blockPeer(pid) + return nil, err + } + } + return headers, nil +} + // Get performs a request for the ExtendedHeader by the given hash corresponding // to the RawHeader. Note that the ExtendedHeader must be verified thereafter. func (ex *Exchange) Get(ctx context.Context, hash tmbytes.HexBytes) (*header.ExtendedHeader, error) { diff --git a/header/p2p/exchange_test.go b/header/p2p/exchange_test.go index 8dfb8c3139..0129757a45 100644 --- a/header/p2p/exchange_test.go +++ b/header/p2p/exchange_test.go @@ -66,7 +66,7 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) { store := createStore(t, totalAmount) protocolSuffix := "private" // create new exchange - exchange := NewExchange(hosts[len(hosts)-1], []peer.ID{}, protocolSuffix) + exchange := NewExchange(hosts[len(hosts)-1], nil, []peer.ID{}, protocolSuffix) exchange.ctx, exchange.cancel = context.WithCancel(context.Background()) t.Cleanup(exchange.cancel) servers := make([]*ExchangeServer, len(hosts)-1) // amount of servers is len(hosts)-1 because one peer acts as a client @@ -265,7 +265,7 @@ func createP2PExAndServer(t *testing.T, host, tpeer libhost.Host) (header.Exchan err := serverSideEx.Start(context.Background()) require.NoError(t, err) - exchange := NewExchange(host, []peer.ID{tpeer.ID()}, "private") + exchange := NewExchange(host, nil, []peer.ID{tpeer.ID()}, "private") exchange.peerTracker.connectedPeers[tpeer.ID()] = &peerStat{peerID: tpeer.ID()} exchange.Start(context.Background()) //nolint:errcheck diff --git a/header/p2p/peer_tracker.go b/header/p2p/peer_tracker.go index 9451b6f929..2ea5740a70 100644 --- a/header/p2p/peer_tracker.go +++ b/header/p2p/peer_tracker.go @@ -8,6 +8,7 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/p2p/net/conngater" ) type peerTracker struct { @@ -17,14 +18,16 @@ type peerTracker struct { // so we can guarantee that peerQueue will only contain active peers disconnectedPeers map[peer.ID]*peerStat - host host.Host + host host.Host + connGater *conngater.BasicConnectionGater } -func newPeerTracker(h host.Host) *peerTracker { +func newPeerTracker(h host.Host, c *conngater.BasicConnectionGater) *peerTracker { return &peerTracker{ disconnectedPeers: make(map[peer.ID]*peerStat), connectedPeers: make(map[peer.ID]*peerStat), host: h, + connGater: c, } } @@ -102,3 +105,27 @@ func (p *peerTracker) peers() []*peerStat { } return peers } + +// blockPeer removes peer from cache and blocks peer on the networking level. +func (p *peerTracker) blockPeer(pID peer.ID) { + p.Lock() + defer p.Unlock() + + for _, pid := range p.connectedPeers { + if pid.peerID == pID { + delete(p.connectedPeers, pID) + return + } + } + + for _, pid := range p.disconnectedPeers { + if pid.peerID == pID { + delete(p.disconnectedPeers, pID) + return + } + } + + if err := p.connGater.BlockPeer(pID); err != nil { + log.Errorw("blocking peer err", "blockedPid", pID, "err", err) + } +} diff --git a/header/p2p/session.go b/header/p2p/session.go index 3f64f71173..15fc3a914c 100644 --- a/header/p2p/session.go +++ b/header/p2p/session.go @@ -5,6 +5,7 @@ import ( "errors" "io" "sort" + "sync" "time" "github.com/libp2p/go-libp2p-core/host" @@ -23,6 +24,12 @@ var ( headersPerPeer uint64 = 64 ) +// requestStat contains a range [from;to] that was requested from the peer. +type requestStat struct { + from int64 + to int64 +} + // session aims to divide a range of headers // into several smaller requests among different peers. type session struct { @@ -35,6 +42,10 @@ type session struct { reqCh chan *p2p_pb.ExtendedHeaderRequest errCh chan error + + cacheLk sync.Mutex + // requestsCache used to store ranges that will be requested from the peer. + requestsCache map[peer.ID][]requestStat } func newSession(ctx context.Context, h host.Host, peerTracker []*peerStat, protocolID protocol.ID) *session { @@ -184,6 +195,9 @@ func (s *session) requestHeaders( break } stream.Reset() //nolint:errcheck + if err == io.EOF { + break + } return nil, 0, 0, err } totalRequestSize += uint64(msgSize) @@ -210,10 +224,25 @@ func (s *session) processResponse(responses []*p2p_pb.ExtendedHeaderResponse) ([ } headers = append(headers, header) } - + if len(headers) == 0 { + return nil, header.ErrNotFound + } return headers, nil } +func (s *session) peerByHeaderHeight(height int64) peer.ID { + s.cacheLk.Lock() + defer s.cacheLk.Lock() + for pID, stat := range s.requestsCache { + for _, req := range stat { + if req.from >= height && req.to <= height { + return pID + } + } + } + panic("could not found in range") +} + // prepareRequests converts incoming range into separate ExtendedHeaderRequest. func prepareRequests(from, amount, headersPerPeer uint64) []*p2p_pb.ExtendedHeaderRequest { requests := make([]*p2p_pb.ExtendedHeaderRequest, 0, amount/headersPerPeer) diff --git a/header/sync/sync_test.go b/header/sync/sync_test.go index 4d65a576da..524cf69d23 100644 --- a/header/sync/sync_test.go +++ b/header/sync/sync_test.go @@ -215,3 +215,8 @@ func (e *exchangeCountingHead) GetByHeight(ctx context.Context, u uint64) (*head func (e *exchangeCountingHead) GetRangeByHeight(c context.Context, from, to uint64) ([]*header.ExtendedHeader, error) { panic("implement me") } + +func (e *exchangeCountingHead) GetVerifiedRangeByHeight(c context.Context, from *header.ExtendedHeader, to uint64, +) ([]*header.ExtendedHeader, error) { + panic("implement me") +} diff --git a/nodebuilder/header/constructors.go b/nodebuilder/header/constructors.go index 7d4d69f68a..25aeb86a1a 100644 --- a/nodebuilder/header/constructors.go +++ b/nodebuilder/header/constructors.go @@ -7,6 +7,7 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/libp2p/go-libp2p/p2p/net/conngater" "go.uber.org/fx" "github.com/celestiaorg/celestia-node/header" @@ -27,12 +28,14 @@ func newP2PExchange(cfg Config) func( modp2p.Bootstrappers, modp2p.Network, host.Host, + *conngater.BasicConnectionGater, ) (header.Exchange, error) { return func( lc fx.Lifecycle, bpeers modp2p.Bootstrappers, network modp2p.Network, host host.Host, + connGater *conngater.BasicConnectionGater, ) (header.Exchange, error) { peers, err := cfg.trustedPeers(bpeers) if err != nil { @@ -43,7 +46,7 @@ func newP2PExchange(cfg Config) func( ids[index] = peer.ID host.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL) } - exchange := p2p.NewExchange(host, ids, string(network)) + exchange := p2p.NewExchange(host, connGater, ids, string(network)) lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { return exchange.Start(ctx) From 07d0dc728d7306e08c0d19602027b7effe0e0356 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Tue, 29 Nov 2022 11:34:30 +0200 Subject: [PATCH 2/8] chore: remove request history and peer punishing --- header/p2p/exchange.go | 14 ++++---------- header/p2p/exchange_test.go | 4 ++-- header/p2p/peer_tracker.go | 31 ++---------------------------- header/p2p/session.go | 24 ----------------------- nodebuilder/header/constructors.go | 5 +---- 5 files changed, 9 insertions(+), 69 deletions(-) diff --git a/header/p2p/exchange.go b/header/p2p/exchange.go index 9a5f27d201..47969a4cf4 100644 --- a/header/p2p/exchange.go +++ b/header/p2p/exchange.go @@ -13,7 +13,6 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" - "github.com/libp2p/go-libp2p/p2p/net/conngater" tmbytes "github.com/tendermint/tendermint/libs/bytes" "github.com/celestiaorg/go-libp2p-messenger/serde" @@ -58,7 +57,6 @@ func protocolID(protocolSuffix string) protocol.ID { func NewExchange( host host.Host, - connGater *conngater.BasicConnectionGater, peers peer.IDSlice, protocolSuffix string, ) *Exchange { @@ -66,7 +64,7 @@ func NewExchange( host: host, protocolID: protocolID(protocolSuffix), trustedPeers: peers, - peerTracker: newPeerTracker(host, connGater), + peerTracker: newPeerTracker(host), } } @@ -171,18 +169,14 @@ func (ex *Exchange) GetVerifiedRangeByHeight( if err != nil { return nil, err } - fromHeight := from.Height + 1 + nextHeight := from.Height + 1 for _, h := range headers { - if fromHeight != h.Height { - pid := session.peerByHeaderHeight(h.Height) - ex.peerTracker.blockPeer(pid) + if nextHeight != h.Height { return nil, errors.New("header/p2p: returned headers are not contiguous") } - fromHeight++ + nextHeight++ err := from.VerifyNonAdjacent(h) if err != nil { - pid := session.peerByHeaderHeight(h.Height) - ex.peerTracker.blockPeer(pid) return nil, err } } diff --git a/header/p2p/exchange_test.go b/header/p2p/exchange_test.go index 0129757a45..8dfb8c3139 100644 --- a/header/p2p/exchange_test.go +++ b/header/p2p/exchange_test.go @@ -66,7 +66,7 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) { store := createStore(t, totalAmount) protocolSuffix := "private" // create new exchange - exchange := NewExchange(hosts[len(hosts)-1], nil, []peer.ID{}, protocolSuffix) + exchange := NewExchange(hosts[len(hosts)-1], []peer.ID{}, protocolSuffix) exchange.ctx, exchange.cancel = context.WithCancel(context.Background()) t.Cleanup(exchange.cancel) servers := make([]*ExchangeServer, len(hosts)-1) // amount of servers is len(hosts)-1 because one peer acts as a client @@ -265,7 +265,7 @@ func createP2PExAndServer(t *testing.T, host, tpeer libhost.Host) (header.Exchan err := serverSideEx.Start(context.Background()) require.NoError(t, err) - exchange := NewExchange(host, nil, []peer.ID{tpeer.ID()}, "private") + exchange := NewExchange(host, []peer.ID{tpeer.ID()}, "private") exchange.peerTracker.connectedPeers[tpeer.ID()] = &peerStat{peerID: tpeer.ID()} exchange.Start(context.Background()) //nolint:errcheck diff --git a/header/p2p/peer_tracker.go b/header/p2p/peer_tracker.go index 2ea5740a70..9451b6f929 100644 --- a/header/p2p/peer_tracker.go +++ b/header/p2p/peer_tracker.go @@ -8,7 +8,6 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p/p2p/net/conngater" ) type peerTracker struct { @@ -18,16 +17,14 @@ type peerTracker struct { // so we can guarantee that peerQueue will only contain active peers disconnectedPeers map[peer.ID]*peerStat - host host.Host - connGater *conngater.BasicConnectionGater + host host.Host } -func newPeerTracker(h host.Host, c *conngater.BasicConnectionGater) *peerTracker { +func newPeerTracker(h host.Host) *peerTracker { return &peerTracker{ disconnectedPeers: make(map[peer.ID]*peerStat), connectedPeers: make(map[peer.ID]*peerStat), host: h, - connGater: c, } } @@ -105,27 +102,3 @@ func (p *peerTracker) peers() []*peerStat { } return peers } - -// blockPeer removes peer from cache and blocks peer on the networking level. -func (p *peerTracker) blockPeer(pID peer.ID) { - p.Lock() - defer p.Unlock() - - for _, pid := range p.connectedPeers { - if pid.peerID == pID { - delete(p.connectedPeers, pID) - return - } - } - - for _, pid := range p.disconnectedPeers { - if pid.peerID == pID { - delete(p.disconnectedPeers, pID) - return - } - } - - if err := p.connGater.BlockPeer(pID); err != nil { - log.Errorw("blocking peer err", "blockedPid", pID, "err", err) - } -} diff --git a/header/p2p/session.go b/header/p2p/session.go index 15fc3a914c..1cd5adea45 100644 --- a/header/p2p/session.go +++ b/header/p2p/session.go @@ -5,7 +5,6 @@ import ( "errors" "io" "sort" - "sync" "time" "github.com/libp2p/go-libp2p-core/host" @@ -24,12 +23,6 @@ var ( headersPerPeer uint64 = 64 ) -// requestStat contains a range [from;to] that was requested from the peer. -type requestStat struct { - from int64 - to int64 -} - // session aims to divide a range of headers // into several smaller requests among different peers. type session struct { @@ -42,10 +35,6 @@ type session struct { reqCh chan *p2p_pb.ExtendedHeaderRequest errCh chan error - - cacheLk sync.Mutex - // requestsCache used to store ranges that will be requested from the peer. - requestsCache map[peer.ID][]requestStat } func newSession(ctx context.Context, h host.Host, peerTracker []*peerStat, protocolID protocol.ID) *session { @@ -230,19 +219,6 @@ func (s *session) processResponse(responses []*p2p_pb.ExtendedHeaderResponse) ([ return headers, nil } -func (s *session) peerByHeaderHeight(height int64) peer.ID { - s.cacheLk.Lock() - defer s.cacheLk.Lock() - for pID, stat := range s.requestsCache { - for _, req := range stat { - if req.from >= height && req.to <= height { - return pID - } - } - } - panic("could not found in range") -} - // prepareRequests converts incoming range into separate ExtendedHeaderRequest. func prepareRequests(from, amount, headersPerPeer uint64) []*p2p_pb.ExtendedHeaderRequest { requests := make([]*p2p_pb.ExtendedHeaderRequest, 0, amount/headersPerPeer) diff --git a/nodebuilder/header/constructors.go b/nodebuilder/header/constructors.go index 25aeb86a1a..7d4d69f68a 100644 --- a/nodebuilder/header/constructors.go +++ b/nodebuilder/header/constructors.go @@ -7,7 +7,6 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" - "github.com/libp2p/go-libp2p/p2p/net/conngater" "go.uber.org/fx" "github.com/celestiaorg/celestia-node/header" @@ -28,14 +27,12 @@ func newP2PExchange(cfg Config) func( modp2p.Bootstrappers, modp2p.Network, host.Host, - *conngater.BasicConnectionGater, ) (header.Exchange, error) { return func( lc fx.Lifecycle, bpeers modp2p.Bootstrappers, network modp2p.Network, host host.Host, - connGater *conngater.BasicConnectionGater, ) (header.Exchange, error) { peers, err := cfg.trustedPeers(bpeers) if err != nil { @@ -46,7 +43,7 @@ func newP2PExchange(cfg Config) func( ids[index] = peer.ID host.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL) } - exchange := p2p.NewExchange(host, connGater, ids, string(network)) + exchange := p2p.NewExchange(host, ids, string(network)) lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { return exchange.Start(ctx) From ef025cb77ba5b6d7564df6d831f426fa006b4df7 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Tue, 29 Nov 2022 12:05:08 +0200 Subject: [PATCH 3/8] chore: move GetVerifiedRangeByHeight under Getter interface --- das/daser_test.go | 8 ++++++++ header/core/exchange.go | 15 +++++++++++++++ header/interface.go | 4 +++- header/local/exchange.go | 4 ++-- header/p2p/exchange.go | 12 ++++-------- header/p2p/exchange_test.go | 8 ++++++++ header/store/store.go | 23 +++++++++++++++++++++++ header/sync/sync_test.go | 2 +- 8 files changed, 64 insertions(+), 12 deletions(-) diff --git a/das/daser_test.go b/das/daser_test.go index ae43d28fbb..afc2c6b788 100644 --- a/das/daser_test.go +++ b/das/daser_test.go @@ -323,6 +323,14 @@ func (m getterStub) GetRangeByHeight(ctx context.Context, from, to uint64) ([]*h return nil, nil } +func (m getterStub) GetVerifiedRange( + context.Context, + *header.ExtendedHeader, + uint64, +) ([]*header.ExtendedHeader, error) { + return nil, nil +} + func (m getterStub) Get(context.Context, tmbytes.HexBytes) (*header.ExtendedHeader, error) { return nil, nil } diff --git a/header/core/exchange.go b/header/core/exchange.go index b3c6d917d0..5449863092 100644 --- a/header/core/exchange.go +++ b/header/core/exchange.go @@ -55,6 +55,21 @@ func (ce *Exchange) GetRangeByHeight(ctx context.Context, from, amount uint64) ( return headers, nil } +func (ce *Exchange) GetVerifiedRange(ctx context.Context, origin *header.ExtendedHeader, amount uint64, +) ([]*header.ExtendedHeader, error) { + headers, err := ce.GetRangeByHeight(ctx, uint64(origin.Height), amount) + if err != nil { + return nil, err + } + for _, h := range headers { + err := origin.VerifyNonAdjacent(h) + if err != nil { + return nil, err + } + } + return headers, nil +} + func (ce *Exchange) Get(ctx context.Context, hash tmbytes.HexBytes) (*header.ExtendedHeader, error) { log.Debugw("requesting header", "hash", hash.String()) block, err := ce.fetcher.GetBlockByHash(ctx, hash) diff --git a/header/interface.go b/header/interface.go index 7a9dde0c13..c472b03795 100644 --- a/header/interface.go +++ b/header/interface.go @@ -58,7 +58,6 @@ type Broadcaster interface { // from the network. type Exchange interface { Getter - GetVerifiedRangeByHeight(context.Context, *ExtendedHeader, uint64) ([]*ExtendedHeader, error) } var ( @@ -126,6 +125,9 @@ type Getter interface { // GetRangeByHeight returns the given range [from:to) of ExtendedHeaders. GetRangeByHeight(ctx context.Context, from, to uint64) ([]*ExtendedHeader, error) + + // GetVerifiedRange returns verified range from the provided ExtendedHeader to the provided height. + GetVerifiedRange(context.Context, *ExtendedHeader, uint64) ([]*ExtendedHeader, error) } // Head contains the behavior necessary for a component to retrieve diff --git a/header/local/exchange.go b/header/local/exchange.go index 5c024ddc0e..9f21d449ba 100644 --- a/header/local/exchange.go +++ b/header/local/exchange.go @@ -43,9 +43,9 @@ func (l *Exchange) GetRangeByHeight(ctx context.Context, origin, amount uint64) return l.store.GetRangeByHeight(ctx, origin, origin+amount) } -func (l *Exchange) GetVerifiedRangeByHeight(ctx context.Context, origin *header.ExtendedHeader, amount uint64, +func (l *Exchange) GetVerifiedRange(ctx context.Context, origin *header.ExtendedHeader, amount uint64, ) ([]*header.ExtendedHeader, error) { - return l.GetRangeByHeight(ctx, uint64(origin.Height+1), uint64(origin.Height)+amount) + return l.store.GetVerifiedRange(ctx, origin, uint64(origin.Height)+amount) } func (l *Exchange) Get(ctx context.Context, hash bytes.HexBytes) (*header.ExtendedHeader, error) { diff --git a/header/p2p/exchange.go b/header/p2p/exchange.go index 47969a4cf4..7b3cac4281 100644 --- a/header/p2p/exchange.go +++ b/header/p2p/exchange.go @@ -55,11 +55,7 @@ func protocolID(protocolSuffix string) protocol.ID { return protocol.ID(fmt.Sprintf("/header-ex/v0.0.3/%s", protocolSuffix)) } -func NewExchange( - host host.Host, - peers peer.IDSlice, - protocolSuffix string, -) *Exchange { +func NewExchange(host host.Host, peers peer.IDSlice, protocolSuffix string) *Exchange { return &Exchange{ host: host, protocolID: protocolID(protocolSuffix), @@ -156,16 +152,16 @@ func (ex *Exchange) GetRangeByHeight(ctx context.Context, from, amount uint64) ( return session.getRangeByHeight(ctx, from, amount) } -// GetVerifiedRangeByHeight performs a request for the given range of ExtendedHeaders to the network and ensures +// GetVerifiedRange performs a request for the given range of ExtendedHeaders to the network and ensures // that returned headers are correct against the passed one. -func (ex *Exchange) GetVerifiedRangeByHeight( +func (ex *Exchange) GetVerifiedRange( ctx context.Context, from *header.ExtendedHeader, amount uint64, ) ([]*header.ExtendedHeader, error) { session := newSession(ex.ctx, ex.host, ex.peerTracker.peers(), ex.protocolID) defer session.close() - headers, err := session.getRangeByHeight(ctx, uint64(from.Height+1), amount) + headers, err := session.getRangeByHeight(ctx, uint64(from.Height), amount) if err != nil { return nil, err } diff --git a/header/p2p/exchange_test.go b/header/p2p/exchange_test.go index 8dfb8c3139..d539751b5d 100644 --- a/header/p2p/exchange_test.go +++ b/header/p2p/exchange_test.go @@ -342,6 +342,14 @@ func (m *mockStore) GetRangeByHeight(ctx context.Context, from, to uint64) ([]*h return headers, nil } +func (m *mockStore) GetVerifiedRange( + ctx context.Context, + h *header.ExtendedHeader, + to uint64, +) ([]*header.ExtendedHeader, error) { + return m.GetRangeByHeight(ctx, uint64(h.Height)+1, to) +} + func (m *mockStore) Has(context.Context, tmbytes.HexBytes) (bool, error) { return false, nil } diff --git a/header/store/store.go b/header/store/store.go index 9587fa79f7..d351322d1a 100644 --- a/header/store/store.go +++ b/header/store/store.go @@ -245,6 +245,29 @@ func (s *Store) GetRangeByHeight(ctx context.Context, from, to uint64) ([]*heade return headers, nil } +func (s *Store) GetVerifiedRange( + ctx context.Context, + from *header.ExtendedHeader, + to uint64, +) ([]*header.ExtendedHeader, error) { + if uint64(from.Height) >= to { + return nil, fmt.Errorf("header/store: invalid range(%d,%d)", from.Height, to) + } + headers, err := s.GetRangeByHeight(ctx, uint64(from.Height), to) + if err != nil { + return nil, err + } + + for _, h := range headers { + err := from.VerifyNonAdjacent(h) + if err != nil { + return nil, err + } + } + + return headers, nil +} + func (s *Store) Has(ctx context.Context, hash tmbytes.HexBytes) (bool, error) { if ok := s.cache.Contains(hash.String()); ok { return ok, nil diff --git a/header/sync/sync_test.go b/header/sync/sync_test.go index 524cf69d23..e4041a77ae 100644 --- a/header/sync/sync_test.go +++ b/header/sync/sync_test.go @@ -216,7 +216,7 @@ func (e *exchangeCountingHead) GetRangeByHeight(c context.Context, from, to uint panic("implement me") } -func (e *exchangeCountingHead) GetVerifiedRangeByHeight(c context.Context, from *header.ExtendedHeader, to uint64, +func (e *exchangeCountingHead) GetVerifiedRange(c context.Context, from *header.ExtendedHeader, to uint64, ) ([]*header.ExtendedHeader, error) { panic("implement me") } From 4fdd06f330cbbf18927b7e32a63c3aeb617d721f Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Tue, 29 Nov 2022 18:30:52 +0200 Subject: [PATCH 4/8] fix: change verification from nonAdjancet to Adjacent + add tests --- header/core/exchange.go | 5 ++++- header/interface.go | 2 +- header/p2p/exchange.go | 16 +++++++--------- header/p2p/exchange_test.go | 19 +++++++++++++++++++ header/p2p/session.go | 3 --- header/store/store.go | 5 +++-- 6 files changed, 34 insertions(+), 16 deletions(-) diff --git a/header/core/exchange.go b/header/core/exchange.go index 5449863092..7ece36a42b 100644 --- a/header/core/exchange.go +++ b/header/core/exchange.go @@ -57,16 +57,19 @@ func (ce *Exchange) GetRangeByHeight(ctx context.Context, from, amount uint64) ( func (ce *Exchange) GetVerifiedRange(ctx context.Context, origin *header.ExtendedHeader, amount uint64, ) ([]*header.ExtendedHeader, error) { - headers, err := ce.GetRangeByHeight(ctx, uint64(origin.Height), amount) + headers, err := ce.GetRangeByHeight(ctx, uint64(origin.Height)+1, amount) if err != nil { return nil, err } + for _, h := range headers { err := origin.VerifyNonAdjacent(h) if err != nil { return nil, err } + origin = h } + return headers, nil } diff --git a/header/interface.go b/header/interface.go index c472b03795..96160ea952 100644 --- a/header/interface.go +++ b/header/interface.go @@ -127,7 +127,7 @@ type Getter interface { GetRangeByHeight(ctx context.Context, from, to uint64) ([]*ExtendedHeader, error) // GetVerifiedRange returns verified range from the provided ExtendedHeader to the provided height. - GetVerifiedRange(context.Context, *ExtendedHeader, uint64) ([]*ExtendedHeader, error) + GetVerifiedRange(ctx context.Context, origin *ExtendedHeader, to uint64) ([]*ExtendedHeader, error) } // Head contains the behavior necessary for a component to retrieve diff --git a/header/p2p/exchange.go b/header/p2p/exchange.go index 7b3cac4281..26b05c20b8 100644 --- a/header/p2p/exchange.go +++ b/header/p2p/exchange.go @@ -3,7 +3,6 @@ package p2p import ( "bytes" "context" - "errors" "fmt" "math/rand" "sort" @@ -156,26 +155,25 @@ func (ex *Exchange) GetRangeByHeight(ctx context.Context, from, amount uint64) ( // that returned headers are correct against the passed one. func (ex *Exchange) GetVerifiedRange( ctx context.Context, - from *header.ExtendedHeader, + origin *header.ExtendedHeader, amount uint64, ) ([]*header.ExtendedHeader, error) { session := newSession(ex.ctx, ex.host, ex.peerTracker.peers(), ex.protocolID) defer session.close() - headers, err := session.getRangeByHeight(ctx, uint64(from.Height), amount) + + headers, err := session.getRangeByHeight(ctx, uint64(origin.Height)+1, amount) if err != nil { return nil, err } - nextHeight := from.Height + 1 + for _, h := range headers { - if nextHeight != h.Height { - return nil, errors.New("header/p2p: returned headers are not contiguous") - } - nextHeight++ - err := from.VerifyNonAdjacent(h) + err := origin.VerifyAdjacent(h) if err != nil { return nil, err } + origin = h } + return headers, nil } diff --git a/header/p2p/exchange_test.go b/header/p2p/exchange_test.go index d539751b5d..e35a36d897 100644 --- a/header/p2p/exchange_test.go +++ b/header/p2p/exchange_test.go @@ -55,6 +55,25 @@ func TestExchange_RequestHeaders(t *testing.T) { } } +func TestExchange_RequestVerifiedHeaders(t *testing.T) { + hosts := createMocknet(t, 2) + exchg, store := createP2PExAndServer(t, hosts[0], hosts[1]) + // perform expected request + h := store.headers[1] + _, err := exchg.GetVerifiedRange(context.Background(), h, 3) + require.NoError(t, err) +} + +func TestExchange_RequestVerifiedHeadersFails(t *testing.T) { + hosts := createMocknet(t, 2) + exchg, store := createP2PExAndServer(t, hosts[0], hosts[1]) + store.headers[2] = store.headers[3] + // perform expected request + h := store.headers[1] + _, err := exchg.GetVerifiedRange(context.Background(), h, 3) + require.Error(t, err) +} + // TestExchange_RequestFullRangeHeaders requests max amount of headers // to verify how session will parallelize all requests. func TestExchange_RequestFullRangeHeaders(t *testing.T) { diff --git a/header/p2p/session.go b/header/p2p/session.go index 1cd5adea45..74889c9d1c 100644 --- a/header/p2p/session.go +++ b/header/p2p/session.go @@ -184,9 +184,6 @@ func (s *session) requestHeaders( break } stream.Reset() //nolint:errcheck - if err == io.EOF { - break - } return nil, 0, 0, err } totalRequestSize += uint64(msgSize) diff --git a/header/store/store.go b/header/store/store.go index d351322d1a..b644091504 100644 --- a/header/store/store.go +++ b/header/store/store.go @@ -253,16 +253,17 @@ func (s *Store) GetVerifiedRange( if uint64(from.Height) >= to { return nil, fmt.Errorf("header/store: invalid range(%d,%d)", from.Height, to) } - headers, err := s.GetRangeByHeight(ctx, uint64(from.Height), to) + headers, err := s.GetRangeByHeight(ctx, uint64(from.Height)+1, to) if err != nil { return nil, err } for _, h := range headers { - err := from.VerifyNonAdjacent(h) + err := from.VerifyAdjacent(h) if err != nil { return nil, err } + from = h } return headers, nil From dcab08ffee0e32cfbd41fe52fd4f4378e1faf4f2 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Wed, 30 Nov 2022 14:10:43 +0200 Subject: [PATCH 5/8] chore: change comment --- header/interface.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/header/interface.go b/header/interface.go index 96160ea952..25b1b9661b 100644 --- a/header/interface.go +++ b/header/interface.go @@ -126,7 +126,8 @@ type Getter interface { // GetRangeByHeight returns the given range [from:to) of ExtendedHeaders. GetRangeByHeight(ctx context.Context, from, to uint64) ([]*ExtendedHeader, error) - // GetVerifiedRange returns verified range from the provided ExtendedHeader to the provided height. + // GetVerifiedRange requests the header range from the provided ExtendedHeader and + // verifies that the returned headers are adjacent to each other. GetVerifiedRange(ctx context.Context, origin *ExtendedHeader, to uint64) ([]*ExtendedHeader, error) } From 14dac2cb7cda10572f1e6ce4794102327b1e179b Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Wed, 30 Nov 2022 15:02:14 +0200 Subject: [PATCH 6/8] fix: fix core exchange --- header/core/exchange.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/header/core/exchange.go b/header/core/exchange.go index 7ece36a42b..134fe6dbbd 100644 --- a/header/core/exchange.go +++ b/header/core/exchange.go @@ -63,7 +63,7 @@ func (ce *Exchange) GetVerifiedRange(ctx context.Context, origin *header.Extende } for _, h := range headers { - err := origin.VerifyNonAdjacent(h) + err := origin.VerifyAdjacent(h) if err != nil { return nil, err } From 1b342501c6386f0219058b2dd1338acd380a7f7e Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Wed, 30 Nov 2022 15:55:21 +0200 Subject: [PATCH 7/8] chore: apply suggestions Co-authored-by: rene <41963722+renaynay@users.noreply.github.com> --- header/core/exchange.go | 1 - header/p2p/exchange.go | 1 - header/store/store.go | 1 - 3 files changed, 3 deletions(-) diff --git a/header/core/exchange.go b/header/core/exchange.go index 134fe6dbbd..0fa5dc053f 100644 --- a/header/core/exchange.go +++ b/header/core/exchange.go @@ -69,7 +69,6 @@ func (ce *Exchange) GetVerifiedRange(ctx context.Context, origin *header.Extende } origin = h } - return headers, nil } diff --git a/header/p2p/exchange.go b/header/p2p/exchange.go index 26b05c20b8..07417cd93b 100644 --- a/header/p2p/exchange.go +++ b/header/p2p/exchange.go @@ -173,7 +173,6 @@ func (ex *Exchange) GetVerifiedRange( } origin = h } - return headers, nil } diff --git a/header/store/store.go b/header/store/store.go index b644091504..42be50e955 100644 --- a/header/store/store.go +++ b/header/store/store.go @@ -265,7 +265,6 @@ func (s *Store) GetVerifiedRange( } from = h } - return headers, nil } From 9669a31dfa689f77a44526aaf555a3220e4857e6 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Wed, 30 Nov 2022 16:27:07 +0200 Subject: [PATCH 8/8] chore: rename params --- das/daser_test.go | 2 +- header/core/exchange.go | 8 ++++---- header/interface.go | 6 +++--- header/local/exchange.go | 4 ++-- header/p2p/exchange.go | 8 ++++---- header/sync/sync_test.go | 7 +++++-- 6 files changed, 19 insertions(+), 16 deletions(-) diff --git a/das/daser_test.go b/das/daser_test.go index afc2c6b788..fac628418b 100644 --- a/das/daser_test.go +++ b/das/daser_test.go @@ -319,7 +319,7 @@ func (m getterStub) GetByHeight(_ context.Context, height uint64) (*header.Exten DAH: &header.DataAvailabilityHeader{RowsRoots: make([][]byte, 0)}}, nil } -func (m getterStub) GetRangeByHeight(ctx context.Context, from, to uint64) ([]*header.ExtendedHeader, error) { +func (m getterStub) GetRangeByHeight(ctx context.Context, from, amount uint64) ([]*header.ExtendedHeader, error) { return nil, nil } diff --git a/header/core/exchange.go b/header/core/exchange.go index 0fa5dc053f..9689f84f00 100644 --- a/header/core/exchange.go +++ b/header/core/exchange.go @@ -55,19 +55,19 @@ func (ce *Exchange) GetRangeByHeight(ctx context.Context, from, amount uint64) ( return headers, nil } -func (ce *Exchange) GetVerifiedRange(ctx context.Context, origin *header.ExtendedHeader, amount uint64, +func (ce *Exchange) GetVerifiedRange(ctx context.Context, from *header.ExtendedHeader, amount uint64, ) ([]*header.ExtendedHeader, error) { - headers, err := ce.GetRangeByHeight(ctx, uint64(origin.Height)+1, amount) + headers, err := ce.GetRangeByHeight(ctx, uint64(from.Height)+1, amount) if err != nil { return nil, err } for _, h := range headers { - err := origin.VerifyAdjacent(h) + err := from.VerifyAdjacent(h) if err != nil { return nil, err } - origin = h + from = h } return headers, nil } diff --git a/header/interface.go b/header/interface.go index 25b1b9661b..b6a439f4f1 100644 --- a/header/interface.go +++ b/header/interface.go @@ -123,12 +123,12 @@ type Getter interface { // GetByHeight returns the ExtendedHeader corresponding to the given block height. GetByHeight(context.Context, uint64) (*ExtendedHeader, error) - // GetRangeByHeight returns the given range [from:to) of ExtendedHeaders. - GetRangeByHeight(ctx context.Context, from, to uint64) ([]*ExtendedHeader, error) + // GetRangeByHeight returns the given range of ExtendedHeaders. + GetRangeByHeight(ctx context.Context, from, amount uint64) ([]*ExtendedHeader, error) // GetVerifiedRange requests the header range from the provided ExtendedHeader and // verifies that the returned headers are adjacent to each other. - GetVerifiedRange(ctx context.Context, origin *ExtendedHeader, to uint64) ([]*ExtendedHeader, error) + GetVerifiedRange(ctx context.Context, from *ExtendedHeader, amount uint64) ([]*ExtendedHeader, error) } // Head contains the behavior necessary for a component to retrieve diff --git a/header/local/exchange.go b/header/local/exchange.go index 9f21d449ba..3bd978f483 100644 --- a/header/local/exchange.go +++ b/header/local/exchange.go @@ -43,9 +43,9 @@ func (l *Exchange) GetRangeByHeight(ctx context.Context, origin, amount uint64) return l.store.GetRangeByHeight(ctx, origin, origin+amount) } -func (l *Exchange) GetVerifiedRange(ctx context.Context, origin *header.ExtendedHeader, amount uint64, +func (l *Exchange) GetVerifiedRange(ctx context.Context, from *header.ExtendedHeader, amount uint64, ) ([]*header.ExtendedHeader, error) { - return l.store.GetVerifiedRange(ctx, origin, uint64(origin.Height)+amount) + return l.store.GetVerifiedRange(ctx, from, uint64(from.Height)+amount) } func (l *Exchange) Get(ctx context.Context, hash bytes.HexBytes) (*header.ExtendedHeader, error) { diff --git a/header/p2p/exchange.go b/header/p2p/exchange.go index 07417cd93b..41cb4d8925 100644 --- a/header/p2p/exchange.go +++ b/header/p2p/exchange.go @@ -155,23 +155,23 @@ func (ex *Exchange) GetRangeByHeight(ctx context.Context, from, amount uint64) ( // that returned headers are correct against the passed one. func (ex *Exchange) GetVerifiedRange( ctx context.Context, - origin *header.ExtendedHeader, + from *header.ExtendedHeader, amount uint64, ) ([]*header.ExtendedHeader, error) { session := newSession(ex.ctx, ex.host, ex.peerTracker.peers(), ex.protocolID) defer session.close() - headers, err := session.getRangeByHeight(ctx, uint64(origin.Height)+1, amount) + headers, err := session.getRangeByHeight(ctx, uint64(from.Height)+1, amount) if err != nil { return nil, err } for _, h := range headers { - err := origin.VerifyAdjacent(h) + err := from.VerifyAdjacent(h) if err != nil { return nil, err } - origin = h + from = h } return headers, nil } diff --git a/header/sync/sync_test.go b/header/sync/sync_test.go index e4041a77ae..4324925f56 100644 --- a/header/sync/sync_test.go +++ b/header/sync/sync_test.go @@ -212,11 +212,14 @@ func (e *exchangeCountingHead) GetByHeight(ctx context.Context, u uint64) (*head panic("implement me") } -func (e *exchangeCountingHead) GetRangeByHeight(c context.Context, from, to uint64) ([]*header.ExtendedHeader, error) { +func (e *exchangeCountingHead) GetRangeByHeight( + c context.Context, + from, amount uint64, +) ([]*header.ExtendedHeader, error) { panic("implement me") } -func (e *exchangeCountingHead) GetVerifiedRange(c context.Context, from *header.ExtendedHeader, to uint64, +func (e *exchangeCountingHead) GetVerifiedRange(c context.Context, from *header.ExtendedHeader, amount uint64, ) ([]*header.ExtendedHeader, error) { panic("implement me") }