Skip to content

Commit

Permalink
network: implement pool for Extensible payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
fyrchik committed Jan 27, 2021
1 parent 0aa5070 commit b83dc31
Show file tree
Hide file tree
Showing 11 changed files with 307 additions and 48 deletions.
26 changes: 0 additions & 26 deletions pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,12 @@ type Service interface {
OnPayload(p *npayload.Extensible)
// OnTransaction is a callback to notify Service about new received transaction.
OnTransaction(tx *transaction.Transaction)
// GetPayload returns Payload with specified hash if it is present in the local cache.
GetPayload(h util.Uint256) *npayload.Extensible
}

type service struct {
Config

log *zap.Logger
// cache is a fifo cache which stores recent payloads.
cache *relayCache
// txx is a fifo cache which stores miner transactions.
txx *relayCache
dbft *dbft.DBFT
Expand Down Expand Up @@ -124,7 +120,6 @@ func NewService(cfg Config) (Service, error) {
Config: cfg,

log: cfg.Logger,
cache: newFIFOCache(cacheMaxCapacity),
txx: newFIFOCache(cacheMaxCapacity),
messages: make(chan Payload, 100),

Expand Down Expand Up @@ -379,21 +374,13 @@ func (s *service) payloadFromExtensible(ep *npayload.Extensible) *Payload {
// OnPayload handles Payload receive.
func (s *service) OnPayload(cp *npayload.Extensible) {
log := s.log.With(zap.Stringer("hash", cp.Hash()))
if s.cache.Has(cp.Hash()) {
log.Debug("payload is already in cache")
return
}

p := s.payloadFromExtensible(cp)
p.decodeData()
if !s.validatePayload(p) {
log.Info("can't validate payload")
return
}

s.Config.Broadcast(cp)
s.cache.Add(cp)

if s.dbft == nil || !s.started.Load() {
log.Debug("dbft is inactive or not started yet")
return
Expand All @@ -416,25 +403,12 @@ func (s *service) OnTransaction(tx *transaction.Transaction) {
}
}

// GetPayload returns payload stored in cache.
func (s *service) GetPayload(h util.Uint256) *npayload.Extensible {
p := s.cache.Get(h)
if p == nil {
return (*npayload.Extensible)(nil)
}

cp := *p.(*npayload.Extensible)

return &cp
}

func (s *service) broadcast(p payload.ConsensusPayload) {
if err := p.(*Payload).Sign(s.dbft.Priv.(*privateKey)); err != nil {
s.log.Warn("can't sign consensus payload", zap.Error(err))
}

ep := &p.(*Payload).Extensible
s.cache.Add(ep)
s.Config.Broadcast(ep)
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ func TestService_OnPayload(t *testing.T) {
// sender is invalid
srv.OnPayload(&p.Extensible)
shouldNotReceive(t, srv.messages)
require.Nil(t, srv.GetPayload(p.Hash()))

p = new(Payload)
p.SetValidatorIndex(1)
Expand All @@ -360,11 +359,6 @@ func TestService_OnPayload(t *testing.T) {
require.NoError(t, p.Sign(priv))
srv.OnPayload(&p.Extensible)
shouldReceive(t, srv.messages)
require.Equal(t, &p.Extensible, srv.GetPayload(p.Hash()))

// payload has already been received
srv.OnPayload(&p.Extensible)
shouldNotReceive(t, srv.messages)
srv.Chain.Close()
}

Expand Down
67 changes: 67 additions & 0 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/bigint"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
Expand Down Expand Up @@ -130,6 +131,8 @@ type Blockchain struct {

contracts native.Contracts

extensible atomic.Value

// Notification subsystem.
events chan bcEvent
subCh chan interface{}
Expand Down Expand Up @@ -289,6 +292,7 @@ func (bc *Blockchain) init() error {
return fmt.Errorf("can't init cache for Management native contract: %w", err)
}

bc.updateExtensibleWhitelist(bHeight)
return nil
}

Expand Down Expand Up @@ -751,6 +755,7 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
for _, f := range bc.postBlock {
f(bc, txpool, block)
}
bc.updateExtensibleWhitelist(block.Index)
bc.lock.Unlock()

updateBlockHeightMetric(block.Index)
Expand All @@ -763,6 +768,68 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
return nil
}

func (bc *Blockchain) updateExtensibleWhitelist(height uint32) error {
updateCommittee := native.ShouldUpdateCommittee(height, bc)
oracles, oh, err := bc.contracts.Designate.GetDesignatedByRole(bc.dao, native.RoleOracle, height)
if err != nil {
return err
}
stateVals, sh, err := bc.contracts.Designate.GetDesignatedByRole(bc.dao, native.RoleStateValidator, height)
if err != nil {
return err
}

if bc.extensible.Load() != nil && !updateCommittee && oh != height && sh != height {
return nil
}

newList := []util.Uint160{bc.contracts.NEO.GetCommitteeAddress()}
nextVals := bc.contracts.NEO.GetNextBlockValidatorsInternal()
script, err := smartcontract.CreateDefaultMultiSigRedeemScript(nextVals)
if err != nil {
return err
}
newList = append(newList, hash.Hash160(script))
bc.updateExtensibleList(&newList, bc.contracts.NEO.GetNextBlockValidatorsInternal())

if len(oracles) > 0 {
h, err := bc.contracts.Designate.GetLastDesignatedHash(bc.dao, native.RoleOracle)
if err != nil {
return err
}
newList = append(newList, h)
bc.updateExtensibleList(&newList, oracles)
}

if len(stateVals) > 0 {
h, err := bc.contracts.Designate.GetLastDesignatedHash(bc.dao, native.RoleStateValidator)
if err != nil {
return err
}
newList = append(newList, h)
bc.updateExtensibleList(&newList, stateVals)
}

sort.Slice(newList, func(i, j int) bool {
return newList[i].Less(newList[j])
})
bc.extensible.Store(newList)
return nil
}

func (bc *Blockchain) updateExtensibleList(s *[]util.Uint160, pubs keys.PublicKeys) {
for _, pub := range pubs {
*s = append(*s, pub.GetScriptHash())
}
}

// IsExtensibleAllowed determines if script hash is allowed to send extensible payloads.
func (bc *Blockchain) IsExtensibleAllowed(u util.Uint160) bool {
us := bc.extensible.Load().([]util.Uint160)
n := sort.Search(len(us), func(i int) bool { return !us[i].Less(u) })
return n < len(us)
}

func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache *dao.Cached, trig trigger.Type) (*state.AppExecResult, error) {
systemInterop := bc.newInteropContext(trig, cache, block, nil)
v := systemInterop.SpawnVM()
Expand Down
1 change: 1 addition & 0 deletions pkg/core/blockchainer/blockchainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Blockchainer interface {
CurrentBlockHash() util.Uint256
HasBlock(util.Uint256) bool
HasTransaction(util.Uint256) bool
IsExtensibleAllowed(util.Uint160) bool
GetAppExecResults(util.Uint256, trigger.Type) ([]state.AppExecResult, error)
GetNotaryDepositExpiration(acc util.Uint160) uint32
GetNativeContractScriptHash(string) (util.Uint160, error)
Expand Down
3 changes: 2 additions & 1 deletion pkg/core/native/designate.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ func (s *Designate) getCachedRoleData(r Role) *roleData {
return nil
}

func (s *Designate) getLastDesignatedHash(d dao.DAO, r Role) (util.Uint160, error) {
// GetLastDesignatedHash returns last designated hash of a given role.
func (s *Designate) GetLastDesignatedHash(d dao.DAO, r Role) (util.Uint160, error) {
if !s.isValidRole(r) {
return util.Uint160{}, ErrInvalidRole
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/native/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (o *Oracle) PutRequestInternal(id uint64, req *state.OracleRequest, d dao.D

// GetScriptHash returns script hash or oracle nodes.
func (o *Oracle) GetScriptHash(d dao.DAO) (util.Uint160, error) {
return o.Desig.getLastDesignatedHash(d, RoleOracle)
return o.Desig.GetLastDesignatedHash(d, RoleOracle)
}

// GetOracleNodes returns public keys of oracle nodes.
Expand Down
93 changes: 93 additions & 0 deletions pkg/network/extpool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package extpool

import (
"errors"
"sync"

"github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/util"
)

// Pool represents pool of extensible payloads.
type Pool struct {
lock sync.RWMutex
verified map[util.Uint256]*payload.Extensible
chain blockchainer.Blockchainer
}

// New returns new payload pool using provided chain.
func New(bc blockchainer.Blockchainer) *Pool {
return &Pool{
verified: make(map[util.Uint256]*payload.Extensible),
chain: bc,
}
}

var (
errDisallowedSender = errors.New("disallowed sender")
errInvalidHeight = errors.New("invalid height")
)

// Add adds extensible payload to the pool.
// First return value specifies if payload was new.
// Second one is nil if and only if payload is valid.
func (p *Pool) Add(e *payload.Extensible) (bool, error) {
if ok, err := p.verify(e); err != nil || !ok {
return ok, err
}

p.lock.Lock()
defer p.lock.Unlock()

h := e.Hash()
if _, ok := p.verified[h]; ok {
return false, nil
}
p.verified[h] = e
return true, nil
}

func (p *Pool) verify(e *payload.Extensible) (bool, error) {
if err := p.chain.VerifyWitness(e.Sender, e, &e.Witness, extensibleVerifyMaxGAS); err != nil {
return false, err
}
h := p.chain.BlockHeight()
if h < e.ValidBlockStart || e.ValidBlockEnd <= h {
// We can receive consensus payload for the last or next block
// which leads to unwanted node disconnect.
if e.ValidBlockEnd == h {
return false, nil
}
return false, errInvalidHeight
}
if !p.chain.IsExtensibleAllowed(e.Sender) {
return false, errDisallowedSender
}
return true, nil
}

// Get returns payload by hash.
func (p *Pool) Get(h util.Uint256) *payload.Extensible {
p.lock.RLock()
defer p.lock.RUnlock()

return p.verified[h]
}

const extensibleVerifyMaxGAS = 2000000

// RemoveStale removes invalid payloads after block processing.
func (p *Pool) RemoveStale(index uint32) {
p.lock.Lock()
defer p.lock.Unlock()
for h, e := range p.verified {
if e.ValidBlockEnd <= index || !p.chain.IsExtensibleAllowed(e.Sender) {
delete(p.verified, h)
continue
}
if err := p.chain.VerifyWitness(e.Sender, e, &e.Witness, extensibleVerifyMaxGAS); err != nil {
delete(p.verified, h)
}
}
}
Loading

0 comments on commit b83dc31

Please sign in to comment.