diff --git a/client/knode/handler.go b/client/knode/handler.go index dabbeedf9..d0f517787 100644 --- a/client/knode/handler.go +++ b/client/knode/handler.go @@ -622,11 +622,15 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if !pm.validator.Validating() { break } + // Retrieve and decode the propagated proposal var proposal types.Proposal if err := msg.Decode(&proposal); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } + + p.MarkProposal(proposal.Hash()) + if err := pm.validator.AddProposal(&proposal); err != nil { // ignore break @@ -643,6 +647,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } p.MarkVote(vote.Hash()) + if err := pm.validator.AddVote(&vote); err != nil { // ignore break @@ -659,7 +664,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return errResp(ErrDecode, "msg %v: %v", msg, err) } - p.MarkFragment(request.Data.Proof) + p.MarkBlockFragment(request.Data.Proof) + if err := pm.validator.AddBlockFragment(request.BlockNumber, request.Round, request.Data); err != nil { log.Error("error while adding a new block fragment", "err", err, "round", request.Round, "block", request.BlockNumber, "fragment", request.Data) // ignore @@ -733,11 +739,11 @@ func (pm *ProtocolManager) proposalBroadcastLoop() { for obj := range pm.proposalSub.Chan() { switch ev := obj.Data.(type) { case core.NewProposalEvent: - for _, peer := range pm.peers.Peers() { + for _, peer := range pm.peers.PeersWithoutProposal(ev.Proposal.Hash()) { peer.SendNewProposal(ev.Proposal) } case core.NewBlockFragmentEvent: - for _, peer := range pm.peers.PeersWithoutFragment(ev.Data.Proof) { + for _, peer := range pm.peers.PeersWithoutBlockFragment(ev.Data.Proof) { peer.SendBlockFragment(ev.BlockNumber, ev.Round, ev.Data) } } diff --git a/client/knode/peer.go b/client/knode/peer.go index 0f2cfe9b0..fe311ffc9 100644 --- a/client/knode/peer.go +++ b/client/knode/peer.go @@ -40,8 +40,9 @@ const ( // above some healthy uncle limit, so use that. maxQueuedAnns = 4 - maxKnownVotes = 1024 // Maximum vote hashes to keep in the known list (prevent DOS) - maxKnownFragments = 1024 // Maximum vote hashes to keep in the known list (prevent DOS) + maxKnownProposals = 1024 // Maximum proposal hashes to keep in the known list (prevent DOS) + maxKnownBlockFragments = 1024 // Maximum block fragment hashes to keep in the known list (prevent DOS) + maxKnownVotes = 1024 // Maximum vote hashes to keep in the known list (prevent DOS) handshakeTimeout = 5 * time.Second ) @@ -71,10 +72,13 @@ type peer struct { head common.Hash lock sync.RWMutex - knownTxs *set.Set // Set of transaction hashes known to be known by this peer - knownBlocks *set.Set // Set of block hashes known to be known by this peer - knownVotes *set.Set // set of vote hashes known to be known by this peer - knownFragments *set.Set // set of fragment hashes known to be known by this peer + knownTxs *set.Set + knownBlocks *set.Set + + // consensus + knownProposals *set.Set + knownBlockFragments *set.Set + knownVotes *set.Set queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer queuedProps chan *propEvent // Queue of blocks to broadcast to the peer @@ -84,18 +88,19 @@ type peer struct { func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { return &peer{ - Peer: p, - rw: rw, - version: version, - id: fmt.Sprintf("%x", p.ID().Bytes()[:8]), - knownTxs: set.New(), - knownBlocks: set.New(), - knownVotes: set.New(), - knownFragments: set.New(), - queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), - queuedProps: make(chan *propEvent, maxQueuedProps), - queuedAnns: make(chan *types.Block, maxQueuedAnns), - term: make(chan struct{}), + Peer: p, + rw: rw, + version: version, + id: fmt.Sprintf("%x", p.ID().Bytes()[:8]), + knownTxs: set.New(), + knownBlocks: set.New(), + knownProposals: set.New(), + knownBlockFragments: set.New(), + knownVotes: set.New(), + queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), + queuedProps: make(chan *propEvent, maxQueuedProps), + queuedAnns: make(chan *types.Block, maxQueuedAnns), + term: make(chan struct{}), } } @@ -174,6 +179,26 @@ func (p *peer) MarkBlock(hash common.Hash) { p.knownBlocks.Add(hash) } +// MarkProposal marks a proposal as known for the peer, ensuring that the +// proposal will never be propagated to this particular peer. +func (p *peer) MarkProposal(hash common.Hash) { + // If we reached the memory allowance, drop a previously known fragment hash + for p.knownProposals.Size() >= maxKnownProposals { + p.knownProposals.Pop() + } + p.knownProposals.Add(hash) +} + +// MarkFragment marks a block fragment as known for the peer, ensuring that the +// fragment will never be propagated to this particular peer. +func (p *peer) MarkBlockFragment(hash common.Hash) { + // If we reached the memory allowance, drop a previously known fragment hash + for p.knownBlockFragments.Size() >= maxKnownBlockFragments { + p.knownBlockFragments.Pop() + } + p.knownBlockFragments.Add(hash) +} + // MarkVote marks a vote as known for the peer, ensuring that the block will // never be propagated to this particular peer. func (p *peer) MarkVote(hash common.Hash) { @@ -184,16 +209,6 @@ func (p *peer) MarkVote(hash common.Hash) { p.knownVotes.Add(hash) } -// MarkFragment marks a block fragment as known for the peer, ensuring that the -// fragment will never be propagated to this particular peer. -func (p *peer) MarkFragment(hash common.Hash) { - // If we reached the memory allowance, drop a previously known fragment hash - for p.knownFragments.Size() >= maxKnownFragments { - p.knownFragments.Pop() - } - p.knownFragments.Add(hash) -} - // MarkTransaction marks a transaction as known for the peer, ensuring that it // will never be propagated to this particular peer. func (p *peer) MarkTransaction(hash common.Hash) { @@ -276,6 +291,7 @@ func (p *peer) SendNewProposal(proposal *types.Proposal) error { // SendNewBlock propagates a vote to a remote peer. func (p *peer) SendVote(vote *types.Vote) error { + p.knownVotes.Add(vote.Hash()) return p2p.Send(p.rw, VoteMsg, vote) } @@ -533,15 +549,30 @@ func (ps *peerSet) PeersWithoutVote(hash common.Hash) []*peer { return list } -// PeersWithoutFragment retrieves a list of peers that do not have a given block fragment +// PeersWithoutBlockFragment retrieves a list of peers that do not have a given block fragment +// in their set of known hashes. +func (ps *peerSet) PeersWithoutBlockFragment(hash common.Hash) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.knownBlockFragments.Has(hash) { + list = append(list, p) + } + } + return list +} + +// PeersWithoutProposal retrieves a list of peers that do not have a given proposal // in their set of known hashes. -func (ps *peerSet) PeersWithoutFragment(hash common.Hash) []*peer { +func (ps *peerSet) PeersWithoutProposal(hash common.Hash) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if !p.knownFragments.Has(hash) { + if !p.knownProposals.Has(hash) { list = append(list, p) } } diff --git a/client/knode/validator/states.go b/client/knode/validator/states.go index a796c57c2..c88216ba0 100644 --- a/client/knode/validator/states.go +++ b/client/knode/validator/states.go @@ -123,8 +123,8 @@ func (val *validator) newRoundState() stateFn { val.block = nil val.blockFragments = nil - //fixme: should be checked how to revert stateDB state - val.state.RevertToSnapshot(val.state.Snapshot()) + parent := val.chain.CurrentBlock() + val.makeCurrent(parent) } return val.newProposalState