Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

knode/validator: added instruction to reset the state/duplicates #814

Merged
merged 1 commit into from
Oct 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions client/knode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,11 +622,15 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if !pm.validator.Validating() {
break
}

// Retrieve and decode the propagated proposal
var proposal types.Proposal
if err := msg.Decode(&proposal); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}

p.MarkProposal(proposal.Hash())

if err := pm.validator.AddProposal(&proposal); err != nil {
// ignore
break
Expand All @@ -643,6 +647,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}

p.MarkVote(vote.Hash())

if err := pm.validator.AddVote(&vote); err != nil {
// ignore
break
Expand All @@ -659,7 +664,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}

p.MarkFragment(request.Data.Proof)
p.MarkBlockFragment(request.Data.Proof)

if err := pm.validator.AddBlockFragment(request.BlockNumber, request.Round, request.Data); err != nil {
log.Error("error while adding a new block fragment", "err", err, "round", request.Round, "block", request.BlockNumber, "fragment", request.Data)
// ignore
Expand Down Expand Up @@ -733,11 +739,11 @@ func (pm *ProtocolManager) proposalBroadcastLoop() {
for obj := range pm.proposalSub.Chan() {
switch ev := obj.Data.(type) {
case core.NewProposalEvent:
for _, peer := range pm.peers.Peers() {
for _, peer := range pm.peers.PeersWithoutProposal(ev.Proposal.Hash()) {
peer.SendNewProposal(ev.Proposal)
}
case core.NewBlockFragmentEvent:
for _, peer := range pm.peers.PeersWithoutFragment(ev.Data.Proof) {
for _, peer := range pm.peers.PeersWithoutBlockFragment(ev.Data.Proof) {
peer.SendBlockFragment(ev.BlockNumber, ev.Round, ev.Data)
}
}
Expand Down
93 changes: 62 additions & 31 deletions client/knode/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ const (
// above some healthy uncle limit, so use that.
maxQueuedAnns = 4

maxKnownVotes = 1024 // Maximum vote hashes to keep in the known list (prevent DOS)
maxKnownFragments = 1024 // Maximum vote hashes to keep in the known list (prevent DOS)
maxKnownProposals = 1024 // Maximum proposal hashes to keep in the known list (prevent DOS)
maxKnownBlockFragments = 1024 // Maximum block fragment hashes to keep in the known list (prevent DOS)
maxKnownVotes = 1024 // Maximum vote hashes to keep in the known list (prevent DOS)

handshakeTimeout = 5 * time.Second
)
Expand Down Expand Up @@ -71,10 +72,13 @@ type peer struct {
head common.Hash
lock sync.RWMutex

knownTxs *set.Set // Set of transaction hashes known to be known by this peer
knownBlocks *set.Set // Set of block hashes known to be known by this peer
knownVotes *set.Set // set of vote hashes known to be known by this peer
knownFragments *set.Set // set of fragment hashes known to be known by this peer
knownTxs *set.Set
knownBlocks *set.Set

// consensus
knownProposals *set.Set
knownBlockFragments *set.Set
knownVotes *set.Set

queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
Expand All @@ -84,18 +88,19 @@ type peer struct {

func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return &peer{
Peer: p,
rw: rw,
version: version,
id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
knownTxs: set.New(),
knownBlocks: set.New(),
knownVotes: set.New(),
knownFragments: set.New(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
queuedProps: make(chan *propEvent, maxQueuedProps),
queuedAnns: make(chan *types.Block, maxQueuedAnns),
term: make(chan struct{}),
Peer: p,
rw: rw,
version: version,
id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
knownTxs: set.New(),
knownBlocks: set.New(),
knownProposals: set.New(),
knownBlockFragments: set.New(),
knownVotes: set.New(),
queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
queuedProps: make(chan *propEvent, maxQueuedProps),
queuedAnns: make(chan *types.Block, maxQueuedAnns),
term: make(chan struct{}),
}
}

Expand Down Expand Up @@ -174,6 +179,26 @@ func (p *peer) MarkBlock(hash common.Hash) {
p.knownBlocks.Add(hash)
}

// MarkProposal marks a proposal as known for the peer, ensuring that the
// proposal will never be propagated to this particular peer.
func (p *peer) MarkProposal(hash common.Hash) {
// If we reached the memory allowance, drop a previously known fragment hash
for p.knownProposals.Size() >= maxKnownProposals {
p.knownProposals.Pop()
}
p.knownProposals.Add(hash)
}

// MarkFragment marks a block fragment as known for the peer, ensuring that the
// fragment will never be propagated to this particular peer.
func (p *peer) MarkBlockFragment(hash common.Hash) {
// If we reached the memory allowance, drop a previously known fragment hash
for p.knownBlockFragments.Size() >= maxKnownBlockFragments {
p.knownBlockFragments.Pop()
}
p.knownBlockFragments.Add(hash)
}

// MarkVote marks a vote as known for the peer, ensuring that the block will
// never be propagated to this particular peer.
func (p *peer) MarkVote(hash common.Hash) {
Expand All @@ -184,16 +209,6 @@ func (p *peer) MarkVote(hash common.Hash) {
p.knownVotes.Add(hash)
}

// MarkFragment marks a block fragment as known for the peer, ensuring that the
// fragment will never be propagated to this particular peer.
func (p *peer) MarkFragment(hash common.Hash) {
// If we reached the memory allowance, drop a previously known fragment hash
for p.knownFragments.Size() >= maxKnownFragments {
p.knownFragments.Pop()
}
p.knownFragments.Add(hash)
}

// MarkTransaction marks a transaction as known for the peer, ensuring that it
// will never be propagated to this particular peer.
func (p *peer) MarkTransaction(hash common.Hash) {
Expand Down Expand Up @@ -276,6 +291,7 @@ func (p *peer) SendNewProposal(proposal *types.Proposal) error {

// SendNewBlock propagates a vote to a remote peer.
func (p *peer) SendVote(vote *types.Vote) error {
p.knownVotes.Add(vote.Hash())
return p2p.Send(p.rw, VoteMsg, vote)
}

Expand Down Expand Up @@ -533,15 +549,30 @@ func (ps *peerSet) PeersWithoutVote(hash common.Hash) []*peer {
return list
}

// PeersWithoutFragment retrieves a list of peers that do not have a given block fragment
// PeersWithoutBlockFragment retrieves a list of peers that do not have a given block fragment
// in their set of known hashes.
func (ps *peerSet) PeersWithoutBlockFragment(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()

list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
if !p.knownBlockFragments.Has(hash) {
list = append(list, p)
}
}
return list
}

// PeersWithoutProposal retrieves a list of peers that do not have a given proposal
// in their set of known hashes.
func (ps *peerSet) PeersWithoutFragment(hash common.Hash) []*peer {
func (ps *peerSet) PeersWithoutProposal(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()

list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
if !p.knownFragments.Has(hash) {
if !p.knownProposals.Has(hash) {
list = append(list, p)
}
}
Expand Down
4 changes: 2 additions & 2 deletions client/knode/validator/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func (val *validator) newRoundState() stateFn {
val.block = nil
val.blockFragments = nil

//fixme: should be checked how to revert stateDB state
val.state.RevertToSnapshot(val.state.Snapshot())
parent := val.chain.CurrentBlock()
val.makeCurrent(parent)
}

return val.newProposalState
Expand Down