Skip to content

Commit

Permalink
feat(sync): check expired bundles (#1618)
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f authored Nov 28, 2024
1 parent f4d34ef commit 8abf1b5
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
3 changes: 3 additions & 0 deletions sync/firewall/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ var ErrGossipMessage = errors.New("receive stream message as gossip message")
// ErrStreamMessage is returned when a gossip message sends as stream message.
var ErrStreamMessage = errors.New("receive gossip message as stream message")

// ErrExpiredMessage is returned when we receive a expired message from a peer.
var ErrExpiredMessage = errors.New("received an expired message")

// ErrNetworkMismatch is returned when the bundle doesn't belong to this network.
var ErrNetworkMismatch = errors.New("bundle is not for this network")

Expand Down
44 changes: 43 additions & 1 deletion sync/firewall/firewall.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pactus-project/pactus/network"
"github.com/pactus-project/pactus/state"
"github.com/pactus-project/pactus/sync/bundle"
"github.com/pactus-project/pactus/sync/bundle/message"
"github.com/pactus-project/pactus/sync/peerset"
"github.com/pactus-project/pactus/sync/peerset/peer"
"github.com/pactus-project/pactus/sync/peerset/peer/status"
Expand Down Expand Up @@ -59,7 +60,7 @@ func NewFirewall(conf *Config, network network.Network, peerSet *peerset.PeerSet
func (f *Firewall) OpenGossipBundle(data []byte, from peer.ID) (*bundle.Bundle, error) {
bdl, err := f.openBundle(bytes.NewReader(data), from)
if err != nil {
return nil, err
return bdl, err
}

if !bdl.Message.ShouldBroadcast() {
Expand Down Expand Up @@ -132,11 +133,20 @@ func (f *Firewall) openBundle(r io.Reader, from peer.ID) (*bundle.Bundle, error)

bdl, bytesRead, err := f.decodeBundle(r)
if err != nil {
f.closeConnection(from)
f.peerSet.UpdateInvalidMetric(from, int64(bytesRead))

return nil, err
}

if f.isExpired(bdl) {
f.logger.Info("drop expired bundle", "bundle", bdl)
f.closeConnection(from)
f.peerSet.UpdateInvalidMetric(from, int64(bytesRead))

return bdl, ErrExpiredMessage
}

if err := f.checkBundle(bdl); err != nil {
f.peerSet.UpdateInvalidMetric(from, int64(bytesRead))

Expand All @@ -158,6 +168,38 @@ func (*Firewall) decodeBundle(r io.Reader) (*bundle.Bundle, int, error) {
return bdl, bytesRead, nil
}

func (f *Firewall) isExpired(bdl *bundle.Bundle) bool {
curHeight := f.state.LastBlockHeight()
msg := bdl.Message
thresholdHeight := curHeight - 3

switch msg.Type() {
case message.TypeQueryProposal:
return msg.(*message.QueryProposalMessage).Height < thresholdHeight

case message.TypeProposal:
return msg.(*message.ProposalMessage).Proposal.Height() < thresholdHeight

case message.TypeQueryVote:
return msg.(*message.QueryVoteMessage).Height < thresholdHeight

case message.TypeVote:
return msg.(*message.VoteMessage).Vote.Height() < thresholdHeight

case message.TypeBlockAnnounce:
return msg.(*message.BlockAnnounceMessage).Height() < thresholdHeight

case message.TypeBlocksRequest:
case message.TypeBlocksResponse:
case message.TypeHello:
case message.TypeHelloAck:
case message.TypeTransaction:
return false
}

return false
}

func (f *Firewall) checkBundle(bdl *bundle.Bundle) error {
if err := bdl.BasicCheck(); err != nil {
return err
Expand Down

0 comments on commit 8abf1b5

Please sign in to comment.