Skip to content

Commit

Permalink
knode/validator: added instruction to reset the state (#814)
Browse files Browse the repository at this point in the history
  • Loading branch information
rgeraldes authored and yourheropaul committed Oct 8, 2018
1 parent 89d18e2 commit 4dd553a
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 36 deletions.
12 changes: 9 additions & 3 deletions client/knode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,11 +622,15 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if !pm.validator.Validating() {
break
}

// Retrieve and decode the propagated proposal
var proposal types.Proposal
if err := msg.Decode(&proposal); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}

p.MarkProposal(proposal.Hash())

if err := pm.validator.AddProposal(&proposal); err != nil {
// ignore
break
Expand All @@ -643,6 +647,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}

p.MarkVote(vote.Hash())

if err := pm.validator.AddVote(&vote); err != nil {
// ignore
break
Expand All @@ -659,7 +664,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}

p.MarkFragment(request.Data.Proof)
p.MarkBlockFragment(request.Data.Proof)

if err := pm.validator.AddBlockFragment(request.BlockNumber, request.Round, request.Data); err != nil {
log.Error("error while adding a new block fragment", "err", err, "round", request.Round, "block", request.BlockNumber, "fragment", request.Data)
// ignore
Expand Down Expand Up @@ -733,11 +739,11 @@ func (pm *ProtocolManager) proposalBroadcastLoop() {
for obj := range pm.proposalSub.Chan() {
switch ev := obj.Data.(type) {
case core.NewProposalEvent:
for _, peer := range pm.peers.Peers() {
for _, peer := range pm.peers.PeersWithoutProposal(ev.Proposal.Hash()) {
peer.SendNewProposal(ev.Proposal)
}
case core.NewBlockFragmentEvent:
for _, peer := range pm.peers.PeersWithoutFragment(ev.Data.Proof) {
for _, peer := range pm.peers.PeersWithoutBlockFragment(ev.Data.Proof) {
peer.SendBlockFragment(ev.BlockNumber, ev.Round, ev.Data)
}
}
Expand Down
93 changes: 62 additions & 31 deletions client/knode/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ const (
// above some healthy uncle limit, so use that.
maxQueuedAnns = 4

maxKnownVotes = 1024 // Maximum vote hashes to keep in the known list (prevent DOS)
maxKnownFragments = 1024 // Maximum vote hashes to keep in the known list (prevent DOS)
maxKnownProposals = 1024 // Maximum proposal hashes to keep in the known list (prevent DOS)
maxKnownBlockFragments = 1024 // Maximum block fragment hashes to keep in the known list (prevent DOS)
maxKnownVotes = 1024 // Maximum vote hashes to keep in the known list (prevent DOS)

handshakeTimeout = 5 * time.Second
)
Expand Down Expand Up @@ -71,10 +72,13 @@ type peer struct {
head common.Hash
lock sync.RWMutex

knownTxs *set.Set // Set of transaction hashes known to be known by this peer
knownBlocks *set.Set // Set of block hashes known to be known by this peer
knownVotes *set.Set // set of vote hashes known to be known by this peer
knownFragments *set.Set // set of fragment hashes known to be known by this peer
knownTxs *set.Set
knownBlocks *set.Set

// consensus
knownProposals *set.Set
knownBlockFragments *set.Set
knownVotes *set.Set

queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
Expand All @@ -84,18 +88,19 @@ type peer struct {

func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return &peer{
Peer: p,
rw: rw,
version: version,
id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
knownTxs: set.New(),
knownBlocks: set.New(),
knownVotes: set.New(),
knownFragments: set.New(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
queuedProps: make(chan *propEvent, maxQueuedProps),
queuedAnns: make(chan *types.Block, maxQueuedAnns),
term: make(chan struct{}),
Peer: p,
rw: rw,
version: version,
id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
knownTxs: set.New(),
knownBlocks: set.New(),
knownProposals: set.New(),
knownBlockFragments: set.New(),
knownVotes: set.New(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
queuedProps: make(chan *propEvent, maxQueuedProps),
queuedAnns: make(chan *types.Block, maxQueuedAnns),
term: make(chan struct{}),
}
}

Expand Down Expand Up @@ -174,6 +179,26 @@ func (p *peer) MarkBlock(hash common.Hash) {
p.knownBlocks.Add(hash)
}

// MarkProposal marks a proposal as known for the peer, ensuring that the
// proposal will never be propagated to this particular peer.
func (p *peer) MarkProposal(hash common.Hash) {
// If we reached the memory allowance, drop a previously known fragment hash
for p.knownProposals.Size() >= maxKnownProposals {
p.knownProposals.Pop()
}
p.knownProposals.Add(hash)
}

// MarkFragment marks a block fragment as known for the peer, ensuring that the
// fragment will never be propagated to this particular peer.
func (p *peer) MarkBlockFragment(hash common.Hash) {
// If we reached the memory allowance, drop a previously known fragment hash
for p.knownBlockFragments.Size() >= maxKnownBlockFragments {
p.knownBlockFragments.Pop()
}
p.knownBlockFragments.Add(hash)
}

// MarkVote marks a vote as known for the peer, ensuring that the block will
// never be propagated to this particular peer.
func (p *peer) MarkVote(hash common.Hash) {
Expand All @@ -184,16 +209,6 @@ func (p *peer) MarkVote(hash common.Hash) {
p.knownVotes.Add(hash)
}

// MarkFragment marks a block fragment as known for the peer, ensuring that the
// fragment will never be propagated to this particular peer.
func (p *peer) MarkFragment(hash common.Hash) {
// If we reached the memory allowance, drop a previously known fragment hash
for p.knownFragments.Size() >= maxKnownFragments {
p.knownFragments.Pop()
}
p.knownFragments.Add(hash)
}

// MarkTransaction marks a transaction as known for the peer, ensuring that it
// will never be propagated to this particular peer.
func (p *peer) MarkTransaction(hash common.Hash) {
Expand Down Expand Up @@ -276,6 +291,7 @@ func (p *peer) SendNewProposal(proposal *types.Proposal) error {

// SendNewBlock propagates a vote to a remote peer.
func (p *peer) SendVote(vote *types.Vote) error {
p.knownVotes.Add(vote.Hash())
return p2p.Send(p.rw, VoteMsg, vote)
}

Expand Down Expand Up @@ -533,15 +549,30 @@ func (ps *peerSet) PeersWithoutVote(hash common.Hash) []*peer {
return list
}

// PeersWithoutFragment retrieves a list of peers that do not have a given block fragment
// PeersWithoutBlockFragment retrieves a list of peers that do not have a given block fragment
// in their set of known hashes.
func (ps *peerSet) PeersWithoutBlockFragment(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()

list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
if !p.knownBlockFragments.Has(hash) {
list = append(list, p)
}
}
return list
}

// PeersWithoutProposal retrieves a list of peers that do not have a given proposal
// in their set of known hashes.
func (ps *peerSet) PeersWithoutFragment(hash common.Hash) []*peer {
func (ps *peerSet) PeersWithoutProposal(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()

list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
if !p.knownFragments.Has(hash) {
if !p.knownProposals.Has(hash) {
list = append(list, p)
}
}
Expand Down
4 changes: 2 additions & 2 deletions client/knode/validator/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func (val *validator) newRoundState() stateFn {
val.block = nil
val.blockFragments = nil

//fixme: should be checked how to revert stateDB state
val.state.RevertToSnapshot(val.state.Snapshot())
parent := val.chain.CurrentBlock()
val.makeCurrent(parent)
}

return val.newProposalState
Expand Down

0 comments on commit 4dd553a

Please sign in to comment.