Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(header/p2p): implement retry mechanism for header/p2p session #1477

Merged
merged 7 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 3 additions & 15 deletions header/p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (ex *Exchange) GetRangeByHeight(ctx context.Context, from, amount uint64) (
if amount > ex.Params.MaxRequestSize {
return nil, header.ErrHeadersLimitExceeded
}
session := newSession(ex.ctx, ex.host, ex.peerTracker.peers(), ex.protocolID)
session := newSession(ex.ctx, ex.host, ex.peerTracker, ex.protocolID)
defer session.close()
return session.getRangeByHeight(ctx, from, amount, ex.Params.MaxHeadersPerRequest)
}
Expand All @@ -171,22 +171,10 @@ func (ex *Exchange) GetVerifiedRange(
from *header.ExtendedHeader,
amount uint64,
) ([]*header.ExtendedHeader, error) {
session := newSession(ex.ctx, ex.host, ex.peerTracker.peers(), ex.protocolID)
session := newSession(ex.ctx, ex.host, ex.peerTracker, ex.protocolID, withValidation(from))
defer session.close()

headers, err := session.getRangeByHeight(ctx, uint64(from.Height)+1, amount, ex.Params.MaxHeadersPerRequest)
if err != nil {
return nil, err
}

for _, h := range headers {
err := from.VerifyAdjacent(h)
if err != nil {
return nil, err
}
from = h
}
return headers, nil
return session.getRangeByHeight(ctx, uint64(from.Height)+1, amount, ex.Params.MaxHeadersPerRequest)
}

// Get performs a request for the ExtendedHeader by the given hash corresponding
Expand Down
62 changes: 39 additions & 23 deletions header/p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,15 @@ func TestExchange_RequestVerifiedHeadersFails(t *testing.T) {
store.Headers[2] = store.Headers[3]
// perform expected request
h := store.Headers[1]
_, err := exchg.GetVerifiedRange(context.Background(), h, 3)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
t.Cleanup(cancel)
_, err := exchg.GetVerifiedRange(ctx, h, 3)
require.Error(t, err)

// ensure that peer was added to the blacklist
peers := exchg.peerTracker.connGater.ListBlockedPeers()
require.Len(t, peers, 1)
require.True(t, hosts[1].ID() == peers[0])
}

// TestExchange_RequestFullRangeHeaders requests max amount of headers
Expand All @@ -96,7 +103,9 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) {
servers[index], err = NewExchangeServer(hosts[index], store, protocolSuffix)
require.NoError(t, err)
servers[index].Start(context.Background()) //nolint:errcheck
exchange.peerTracker.peerLk.Lock()
exchange.peerTracker.trackedPeers[hosts[index].ID()] = &peerStat{peerID: hosts[index].ID()}
exchange.peerTracker.peerLk.Unlock()
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
Expand All @@ -109,28 +118,35 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) {

// TestExchange_RequestHeadersFails tests that the Exchange instance will return
// header.ErrNotFound if it will not have requested header.
func TestExchange_RequestHeadersFails(t *testing.T) {
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
func TestExchange_RequestHeadersLimitExceeded(t *testing.T) {
renaynay marked this conversation as resolved.
Show resolved Hide resolved
hosts := createMocknet(t, 2)
exchg, _ := createP2PExAndServer(t, hosts[0], hosts[1])
tt := []struct {
amount uint64
expectedErr *error
}{
{
amount: 10,
expectedErr: &header.ErrNotFound,
},
{
amount: 600,
expectedErr: &header.ErrHeadersLimitExceeded,
},
}
for _, test := range tt {
// perform expected request
_, err := exchg.GetRangeByHeight(context.Background(), 1, test.amount)
require.Error(t, err)
require.ErrorAs(t, err, test.expectedErr)
}
_, err := exchg.GetRangeByHeight(context.Background(), 1, 600)
require.Error(t, err)
require.ErrorAs(t, err, &header.ErrHeadersLimitExceeded)
}

// TestExchange_RequestHeadersFromAnotherPeer tests that the Exchange instance will request range
// from another peer with lower score after receiving header.ErrNotFound
func TestExchange_RequestHeadersFromAnotherPeer(t *testing.T) {
hosts := createMocknet(t, 3)
// create client + server(it does not have needed headers)
exchg, _ := createP2PExAndServer(t, hosts[0], hosts[1])
// create one more server(with more headers in the store)
serverSideEx, err := NewExchangeServer(hosts[2], headerMock.NewStore(t, 10), "private")
require.NoError(t, err)
require.NoError(t, serverSideEx.Start(context.Background()))
t.Cleanup(func() {
serverSideEx.Stop(context.Background()) //nolint:errcheck
})
exchg.peerTracker.peerLk.Lock()
exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 20}
exchg.peerTracker.peerLk.Unlock()
_, err = exchg.GetRangeByHeight(context.Background(), 5, 3)
require.NoError(t, err)
// ensure that peerScore for the second peer is changed
newPeerScore := exchg.peerTracker.trackedPeers[hosts[2].ID()].score()
require.NotEqual(t, 20, newPeerScore)
}

// TestExchange_RequestByHash tests that the Exchange instance can
Expand Down Expand Up @@ -285,7 +301,7 @@ func createMocknet(t *testing.T, amount int) []libhost.Host {
}

// createP2PExAndServer creates a Exchange with 5 headers already in its store.
func createP2PExAndServer(t *testing.T, host, tpeer libhost.Host) (header.Exchange, *headerMock.MockStore) {
func createP2PExAndServer(t *testing.T, host, tpeer libhost.Host) (*Exchange, *headerMock.MockStore) {
store := headerMock.NewStore(t, 5)
serverSideEx, err := NewExchangeServer(tpeer, store, "private")
require.NoError(t, err)
Expand All @@ -295,7 +311,7 @@ func createP2PExAndServer(t *testing.T, host, tpeer libhost.Host) (header.Exchan
require.NoError(t, err)
ex, err := NewExchange(host, []peer.ID{tpeer.ID()}, "private", connGater)
require.NoError(t, err)
ex.peerTracker.trackedPeers[tpeer.ID()] = &peerStat{peerID: tpeer.ID()}
ex.peerTracker.trackedPeers[tpeer.ID()] = &peerStat{peerID: tpeer.ID(), peerScore: 100}
require.NoError(t, ex.Start(context.Background()))

t.Cleanup(func() {
Expand Down
8 changes: 6 additions & 2 deletions header/p2p/peer_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ type peerStat struct {
func (p *peerStat) updateStats(amount uint64, time uint64) {
p.Lock()
defer p.Unlock()
var averageSpeed float32
averageSpeed := float32(amount)
if time != 0 {
averageSpeed = float32(amount / time)
averageSpeed /= float32(time)
}
if p.peerScore == 0.0 {
p.peerScore = averageSpeed
Expand Down Expand Up @@ -112,6 +112,10 @@ func newPeerQueue(ctx context.Context, stats []*peerStat) *peerQueue {
// in case if there are no peer available in current session, it blocks until
// the peer will be pushed in.
func (p *peerQueue) waitPop(ctx context.Context) *peerStat {
// TODO(vgonkivs): implement fallback solution for cases when peer queue is empty.
// As we discussed with @Wondertan there could be 2 possible solutions:
// * use libp2p.Discovery to find new peers outside peerTracker to request headers;
// * implement IWANT/IHAVE messaging system and start requesting ranges from the Peerstore;
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-ctx.Done():
return &peerStat{}
Expand Down
104 changes: 84 additions & 20 deletions header/p2p/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,55 @@ import (
p2p_pb "github.com/celestiaorg/celestia-node/header/p2p/pb"
)

var errUnmarshalling = errors.New("unmarshalling error")
vgonkivs marked this conversation as resolved.
Show resolved Hide resolved

type option func(*session)

func withValidation(from *header.ExtendedHeader) option {
return func(s *session) {
s.from = from
}
}

// session aims to divide a range of headers
// into several smaller requests among different peers.
type session struct {
ctx context.Context
cancel context.CancelFunc
host host.Host
protocolID protocol.ID
queue *peerQueue
// peerTracker contains discovered peers with records that describes their activity.
queue *peerQueue
peerTracker *peerTracker

// `from` is set when additional validation for range is needed.
// Otherwise, it will be nil.
from *header.ExtendedHeader
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved

reqCh chan *p2p_pb.ExtendedHeaderRequest
errCh chan error
ctx context.Context
cancel context.CancelFunc
reqCh chan *p2p_pb.ExtendedHeaderRequest
}

func newSession(ctx context.Context, h host.Host, peerTracker []*peerStat, protocolID protocol.ID) *session {
func newSession(
ctx context.Context,
h host.Host,
peerTracker *peerTracker,
protocolID protocol.ID,
options ...option,
) *session {
ctx, cancel := context.WithCancel(ctx)
return &session{
ctx: ctx,
cancel: cancel,
protocolID: protocolID,
host: h,
queue: newPeerQueue(ctx, peerTracker),
errCh: make(chan error),
ses := &session{
ctx: ctx,
cancel: cancel,
protocolID: protocolID,
host: h,
queue: newPeerQueue(ctx, peerTracker.peers()),
peerTracker: peerTracker,
}

for _, opt := range options {
opt(ses)
}
return ses
}

// GetRangeByHeight requests headers from different peers.
Expand All @@ -61,8 +86,6 @@ func (s *session) getRangeByHeight(
return nil, errors.New("header/p2p: exchange is closed")
case <-ctx.Done():
return nil, ctx.Err()
case err := <-s.errCh:
return nil, err
case res := <-result:
headers = append(headers, res...)
}
Expand Down Expand Up @@ -114,19 +137,29 @@ func (s *session) doRequest(
if err == context.Canceled || err == context.DeadlineExceeded {
return
}
log.Errorw("requesting headers from peer failed."+
"Retrying the request from different peer", "failed peer", stat.peerID, "err", err)
log.Errorw("requesting headers from peer failed.", "failed peer", stat.peerID, "err", err)
select {
case <-ctx.Done():
case <-s.ctx.Done():
// retry request
case s.reqCh <- req:
log.Debug("Retrying the request from different peer")
}
return
}

h, err := s.processResponse(r)
if err != nil {
s.errCh <- err
switch err {
case header.ErrNotFound:
default:
s.peerTracker.blockPeer(stat.peerID, err)
}
select {
case <-ctx.Done():
case <-s.ctx.Done():
case s.reqCh <- req:
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}
return
}
log.Debugw("request headers from peer succeed ", "from", s.host.ID(), "pID", stat.peerID, "amount", req.Amount)
Expand All @@ -147,14 +180,45 @@ func (s *session) processResponse(responses []*p2p_pb.ExtendedHeaderResponse) ([
}
header, err := header.UnmarshalExtendedHeader(resp.Body)
if err != nil {
return nil, err
return nil, errUnmarshalling
}
headers = append(headers, header)
}
if len(headers) == 0 {
return nil, header.ErrNotFound
}
return headers, nil

err := s.validate(headers)
return headers, err
}

// validate checks that the received range of headers is valid against the provided header.
func (s *session) validate(headers []*header.ExtendedHeader) error {
// if `from` is empty, then additional validation for the header`s range is not needed.
if s.from == nil {
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

// verify that the first header in range is valid against the trusted header.
err := s.from.VerifyNonAdjacent(headers[0])
if err != nil {
return nil
}

if len(headers) == 1 {
return nil
}

trusted := headers[0]
// verify that the whole range is valid.
for i := 1; i < len(headers); i++ {
err = trusted.VerifyAdjacent(headers[i])
if err != nil {
return err
}
trusted = headers[i]
}
return nil
}

// prepareRequests converts incoming range into separate ExtendedHeaderRequest.
Expand Down