Skip to content

Commit

Permalink
Merge pull request #1674 from nspcc-dev/extensible_pool
Browse files Browse the repository at this point in the history
Add pool for `Extensible` payloads
  • Loading branch information
roman-khimov authored Jan 28, 2021
2 parents 9a99054 + 9592f3e commit a6921ce
Show file tree
Hide file tree
Showing 12 changed files with 378 additions and 75 deletions.
26 changes: 0 additions & 26 deletions pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,12 @@ type Service interface {
OnPayload(p *npayload.Extensible)
// OnTransaction is a callback to notify Service about new received transaction.
OnTransaction(tx *transaction.Transaction)
// GetPayload returns Payload with specified hash if it is present in the local cache.
GetPayload(h util.Uint256) *npayload.Extensible
}

type service struct {
Config

log *zap.Logger
// cache is a fifo cache which stores recent payloads.
cache *relayCache
// txx is a fifo cache which stores miner transactions.
txx *relayCache
dbft *dbft.DBFT
Expand Down Expand Up @@ -124,7 +120,6 @@ func NewService(cfg Config) (Service, error) {
Config: cfg,

log: cfg.Logger,
cache: newFIFOCache(cacheMaxCapacity),
txx: newFIFOCache(cacheMaxCapacity),
messages: make(chan Payload, 100),

Expand Down Expand Up @@ -379,21 +374,13 @@ func (s *service) payloadFromExtensible(ep *npayload.Extensible) *Payload {
// OnPayload handles Payload receive.
func (s *service) OnPayload(cp *npayload.Extensible) {
log := s.log.With(zap.Stringer("hash", cp.Hash()))
if s.cache.Has(cp.Hash()) {
log.Debug("payload is already in cache")
return
}

p := s.payloadFromExtensible(cp)
p.decodeData()
if !s.validatePayload(p) {
log.Info("can't validate payload")
return
}

s.Config.Broadcast(cp)
s.cache.Add(cp)

if s.dbft == nil || !s.started.Load() {
log.Debug("dbft is inactive or not started yet")
return
Expand All @@ -416,25 +403,12 @@ func (s *service) OnTransaction(tx *transaction.Transaction) {
}
}

// GetPayload returns payload stored in cache.
func (s *service) GetPayload(h util.Uint256) *npayload.Extensible {
p := s.cache.Get(h)
if p == nil {
return (*npayload.Extensible)(nil)
}

cp := *p.(*npayload.Extensible)

return &cp
}

func (s *service) broadcast(p payload.ConsensusPayload) {
if err := p.(*Payload).Sign(s.dbft.Priv.(*privateKey)); err != nil {
s.log.Warn("can't sign consensus payload", zap.Error(err))
}

ep := &p.(*Payload).Extensible
s.cache.Add(ep)
s.Config.Broadcast(ep)
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ func TestService_OnPayload(t *testing.T) {
// sender is invalid
srv.OnPayload(&p.Extensible)
shouldNotReceive(t, srv.messages)
require.Nil(t, srv.GetPayload(p.Hash()))

p = new(Payload)
p.SetValidatorIndex(1)
Expand All @@ -360,11 +359,6 @@ func TestService_OnPayload(t *testing.T) {
require.NoError(t, p.Sign(priv))
srv.OnPayload(&p.Extensible)
shouldReceive(t, srv.messages)
require.Equal(t, &p.Extensible, srv.GetPayload(p.Hash()))

// payload has already been received
srv.OnPayload(&p.Extensible)
shouldNotReceive(t, srv.messages)
srv.Chain.Close()
}

Expand Down
70 changes: 69 additions & 1 deletion pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ type Blockchain struct {

contracts native.Contracts

extensible atomic.Value

// Notification subsystem.
events chan bcEvent
subCh chan interface{}
Expand Down Expand Up @@ -297,7 +299,7 @@ func (bc *Blockchain) init() error {
return fmt.Errorf("can't init cache for Management native contract: %w", err)
}

return nil
return bc.updateExtensibleWhitelist(bHeight)
}

// Run runs chain loop, it needs to be run as goroutine and executing it is
Expand Down Expand Up @@ -759,6 +761,10 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
for _, f := range bc.postBlock {
f(bc, txpool, block)
}
if err := bc.updateExtensibleWhitelist(block.Index); err != nil {
bc.lock.Unlock()
return err
}
bc.lock.Unlock()

updateBlockHeightMetric(block.Index)
Expand All @@ -771,6 +777,68 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
return nil
}

func (bc *Blockchain) updateExtensibleWhitelist(height uint32) error {
updateCommittee := native.ShouldUpdateCommittee(height, bc)
oracles, oh, err := bc.contracts.Designate.GetDesignatedByRole(bc.dao, native.RoleOracle, height)
if err != nil {
return err
}
stateVals, sh, err := bc.contracts.Designate.GetDesignatedByRole(bc.dao, native.RoleStateValidator, height)
if err != nil {
return err
}

if bc.extensible.Load() != nil && !updateCommittee && oh != height && sh != height {
return nil
}

newList := []util.Uint160{bc.contracts.NEO.GetCommitteeAddress()}
nextVals := bc.contracts.NEO.GetNextBlockValidatorsInternal()
script, err := smartcontract.CreateDefaultMultiSigRedeemScript(nextVals)
if err != nil {
return err
}
newList = append(newList, hash.Hash160(script))
bc.updateExtensibleList(&newList, bc.contracts.NEO.GetNextBlockValidatorsInternal())

if len(oracles) > 0 {
h, err := bc.contracts.Designate.GetLastDesignatedHash(bc.dao, native.RoleOracle)
if err != nil {
return err
}
newList = append(newList, h)
bc.updateExtensibleList(&newList, oracles)
}

if len(stateVals) > 0 {
h, err := bc.contracts.Designate.GetLastDesignatedHash(bc.dao, native.RoleStateValidator)
if err != nil {
return err
}
newList = append(newList, h)
bc.updateExtensibleList(&newList, stateVals)
}

sort.Slice(newList, func(i, j int) bool {
return newList[i].Less(newList[j])
})
bc.extensible.Store(newList)
return nil
}

func (bc *Blockchain) updateExtensibleList(s *[]util.Uint160, pubs keys.PublicKeys) {
for _, pub := range pubs {
*s = append(*s, pub.GetScriptHash())
}
}

// IsExtensibleAllowed determines if script hash is allowed to send extensible payloads.
func (bc *Blockchain) IsExtensibleAllowed(u util.Uint160) bool {
us := bc.extensible.Load().([]util.Uint160)
n := sort.Search(len(us), func(i int) bool { return !us[i].Less(u) })
return n < len(us)
}

func (bc *Blockchain) runPersist(script []byte, block *block.Block, cache *dao.Cached, trig trigger.Type) (*state.AppExecResult, error) {
systemInterop := bc.newInteropContext(trig, cache, block, nil)
v := systemInterop.SpawnVM()
Expand Down
1 change: 1 addition & 0 deletions pkg/core/blockchainer/blockchainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Blockchainer interface {
CurrentBlockHash() util.Uint256
HasBlock(util.Uint256) bool
HasTransaction(util.Uint256) bool
IsExtensibleAllowed(util.Uint160) bool
GetAppExecResults(util.Uint256, trigger.Type) ([]state.AppExecResult, error)
GetNotaryDepositExpiration(acc util.Uint160) uint32
GetNativeContractScriptHash(string) (util.Uint160, error)
Expand Down
91 changes: 64 additions & 27 deletions pkg/core/native/designate.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ type Designate struct {

rolesChangedFlag atomic.Value
oracles atomic.Value
stateVals atomic.Value
notaries atomic.Value

// p2pSigExtensionsEnabled defines whether the P2P signature extensions logic is relevant.
p2pSigExtensionsEnabled bool

OracleService atomic.Value
}

type oraclesData struct {
type roleData struct {
nodes keys.PublicKeys
addr util.Uint160
height uint32
Expand Down Expand Up @@ -109,20 +111,18 @@ func (s *Designate) PostPersist(ic *interop.Context) error {
return nil
}

nodeKeys, height, err := s.GetDesignatedByRole(ic.DAO, RoleOracle, math.MaxUint32)
if err != nil {
if err := s.updateCachedRoleData(&s.oracles, ic.DAO, RoleOracle); err != nil {
return err
}

od := &oraclesData{
nodes: nodeKeys,
addr: oracleHashFromNodes(nodeKeys),
height: height,
if err := s.updateCachedRoleData(&s.stateVals, ic.DAO, RoleStateValidator); err != nil {
return err
}
s.oracles.Store(od)
if orc, _ := s.OracleService.Load().(services.Oracle); orc != nil {
orc.UpdateOracleNodes(od.nodes.Copy())
if s.p2pSigExtensionsEnabled {
if err := s.updateCachedRoleData(&s.notaries, ic.DAO, RoleP2PNotary); err != nil {
return err
}
}

s.rolesChangedFlag.Store(false)
return nil
}
Expand Down Expand Up @@ -157,45 +157,82 @@ func (s *Designate) rolesChanged() bool {
return rc == nil || rc.(bool)
}

func oracleHashFromNodes(nodes keys.PublicKeys) util.Uint160 {
func (s *Designate) hashFromNodes(r Role, nodes keys.PublicKeys) util.Uint160 {
if len(nodes) == 0 {
return util.Uint160{}
}
script, _ := smartcontract.CreateMajorityMultiSigRedeemScript(nodes.Copy())
var script []byte
switch r {
case RoleOracle:
script, _ = smartcontract.CreateDefaultMultiSigRedeemScript(nodes.Copy())
case RoleP2PNotary:
script, _ = smartcontract.CreateMultiSigRedeemScript(1, nodes.Copy())
default:
script, _ = smartcontract.CreateMajorityMultiSigRedeemScript(nodes.Copy())
}
return hash.Hash160(script)
}

func (s *Designate) getLastDesignatedHash(d dao.DAO, r Role) (util.Uint160, error) {
func (s *Designate) updateCachedRoleData(v *atomic.Value, d dao.DAO, r Role) error {
nodeKeys, height, err := s.GetDesignatedByRole(d, r, math.MaxUint32)
if err != nil {
return err
}
v.Store(&roleData{
nodes: nodeKeys,
addr: s.hashFromNodes(r, nodeKeys),
height: height,
})
if r == RoleOracle {
if orc, _ := s.OracleService.Load().(services.Oracle); orc != nil {
orc.UpdateOracleNodes(nodeKeys.Copy())
}
}
return nil
}

func (s *Designate) getCachedRoleData(r Role) *roleData {
var val interface{}
switch r {
case RoleOracle:
val = s.oracles.Load()
case RoleStateValidator:
val = s.stateVals.Load()
case RoleP2PNotary:
val = s.notaries.Load()
}
if val != nil {
return val.(*roleData)
}
return nil
}

// GetLastDesignatedHash returns last designated hash of a given role.
func (s *Designate) GetLastDesignatedHash(d dao.DAO, r Role) (util.Uint160, error) {
if !s.isValidRole(r) {
return util.Uint160{}, ErrInvalidRole
}
if r == RoleOracle && !s.rolesChanged() {
odVal := s.oracles.Load()
if odVal != nil {
od := odVal.(*oraclesData)
return od.addr, nil
if !s.rolesChanged() {
if val := s.getCachedRoleData(r); val != nil {
return val.addr, nil
}
}
nodes, _, err := s.GetDesignatedByRole(d, r, math.MaxUint32)
if err != nil {
return util.Uint160{}, err
}
// We only have hashing defined for oracles now.
return oracleHashFromNodes(nodes), nil
return s.hashFromNodes(r, nodes), nil
}

// GetDesignatedByRole returns nodes for role r.
func (s *Designate) GetDesignatedByRole(d dao.DAO, r Role, index uint32) (keys.PublicKeys, uint32, error) {
if !s.isValidRole(r) {
return nil, 0, ErrInvalidRole
}
if r == RoleOracle && !s.rolesChanged() {
odVal := s.oracles.Load()
if odVal != nil {
od := odVal.(*oraclesData)
if od.height <= index {
return od.nodes, od.height, nil
}
if !s.rolesChanged() {
if val := s.getCachedRoleData(r); val != nil && val.height <= index {
return val.nodes.Copy(), val.height, nil
}
}
kvs, err := d.GetStorageItemsWithPrefix(s.ContractID, []byte{byte(r)})
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/native/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (o *Oracle) PutRequestInternal(id uint64, req *state.OracleRequest, d dao.D

// GetScriptHash returns script hash or oracle nodes.
func (o *Oracle) GetScriptHash(d dao.DAO) (util.Uint160, error) {
return o.Desig.getLastDesignatedHash(d, RoleOracle)
return o.Desig.GetLastDesignatedHash(d, RoleOracle)
}

// GetOracleNodes returns public keys of oracle nodes.
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/native_designate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ func TestDesignate_DesignateAsRoleTx(t *testing.T) {
bc.getNodesByRole(t, false, native.RoleOracle, 100500, 0)
bc.getNodesByRole(t, true, native.RoleOracle, 0, 0) // returns an empty list
bc.getNodesByRole(t, true, native.RoleOracle, index, 1) // returns pubs

priv1, err := keys.NewPrivateKey()
require.NoError(t, err)
pubs = keys.PublicKeys{priv1.PublicKey()}
bc.setNodesByRole(t, true, native.RoleStateValidator, pubs)
bc.getNodesByRole(t, true, native.RoleStateValidator, bc.BlockHeight()+1, 1)

}

func TestDesignate_DesignateAsRole(t *testing.T) {
Expand Down
Loading

0 comments on commit a6921ce

Please sign in to comment.