Skip to content

Commit

Permalink
Support multi sequencer (cosmos#823)
Browse files Browse the repository at this point in the history
fixes rollkit/rollkit#819 and
rollkit/rollkit#816

---------

Co-authored-by: Ganesha Upadhyaya <gupadhyaya@Ganeshas-MacBook-Pro-2.local>
Co-authored-by: Manav Aggarwal <manavaggarwal1234@gmail.com>
Co-authored-by: nashqueue <99758629+nashqueue@users.noreply.github.com>
  • Loading branch information
4 people authored Mar 30, 2023
1 parent 835a470 commit a1d058c
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 51 deletions.
57 changes: 48 additions & 9 deletions block/manager.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package block

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -68,6 +70,8 @@ type Manager struct {
// retrieveCond is used to notify sync goroutine (SyncLoop) that it needs to retrieve data
retrieveCond *sync.Cond

lastStateMtx *sync.Mutex

logger log.Logger

// For usage by Lazy Aggregator mode
Expand Down Expand Up @@ -151,6 +155,7 @@ func NewManager(
blockInCh: make(chan newBlockEvent, 100),
FraudProofInCh: make(chan *abci.FraudProof, 100),
retrieveMtx: new(sync.Mutex),
lastStateMtx: new(sync.Mutex),
syncCache: make(map[uint64]*types.Block),
logger: logger,
txsAvailable: txsAvailableCh,
Expand Down Expand Up @@ -340,10 +345,18 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
return fmt.Errorf("failed to save block responses: %w", err)
}

// SaveValidators commits the DB tx
err = m.store.SaveValidators(uint64(b.SignedHeader.Header.Height()), m.lastState.Validators)
if err != nil {
return err
}

if daHeight > newState.DAHeight {
newState.DAHeight = daHeight
}
m.lastStateMtx.Lock()
m.lastState = newState
m.lastStateMtx.Unlock()
err = m.store.UpdateState(m.lastState)
if err != nil {
m.logger.Error("failed to save updated state", "error", err)
Expand Down Expand Up @@ -449,13 +462,37 @@ func (m *Manager) getCommit(header types.Header) (*types.Commit, error) {
}, nil
}

func (m *Manager) IsProposer() (bool, error) {
// if proposer is not set, assume self proposer
if m.lastState.Validators.Proposer == nil {
return true, nil
}

signerPubBytes, err := m.proposerKey.GetPublic().Raw()
if err != nil {
return false, err
}

return bytes.Equal(m.lastState.Validators.Proposer.PubKey.Bytes(), signerPubBytes), nil
}

func (m *Manager) publishBlock(ctx context.Context) error {
var lastCommit *types.Commit
var lastHeaderHash types.Hash
var err error
height := m.store.Height()
newHeight := height + 1

m.lastStateMtx.Lock()
isProposer, err := m.IsProposer()
m.lastStateMtx.Unlock()
if err != nil {
return fmt.Errorf("error while checking for proposer: %w", err)
}
if !isProposer {
return nil
}

// this is a special case, when first block is produced - there is no previous commit
if newHeight == uint64(m.genesis.InitialHeight) {
lastCommit = &types.Commit{}
Expand Down Expand Up @@ -527,6 +564,9 @@ func (m *Manager) publishBlock(ctx context.Context) error {
return err
}

// Only update the stored height after successfully submitting to DA layer and committing to the DB
m.store.SetHeight(uint64(block.SignedHeader.Header.Height()))

// Commit the new state and block which writes to disk on the proxy app
_, _, err = m.executor.Commit(ctx, newState, block, responses)
if err != nil {
Expand All @@ -539,6 +579,12 @@ func (m *Manager) publishBlock(ctx context.Context) error {
return err
}

// SaveValidators commits the DB tx
err = m.store.SaveValidators(uint64(block.SignedHeader.Header.Height()), m.lastState.Validators)
if err != nil {
return err
}

newState.DAHeight = atomic.LoadUint64(&m.daHeight)
// After this call m.lastState is the NEW state returned from ApplyBlock
m.lastState = newState
Expand All @@ -549,18 +595,11 @@ func (m *Manager) publishBlock(ctx context.Context) error {
return err
}

// SaveValidators commits the DB tx
err = m.store.SaveValidators(uint64(block.SignedHeader.Header.Height()), m.lastState.Validators)
if err != nil {
return err
}

// Only update the stored height after successfully submitting to DA layer and committing to the DB
m.store.SetHeight(uint64(block.SignedHeader.Header.Height()))

// Publish header to channel so that header exchange service can broadcast
m.HeaderCh <- &block.SignedHeader

m.logger.Debug("successfully proposed block", "proposer", hex.EncodeToString(block.SignedHeader.ProposerAddress), "height", block.SignedHeader.Height())

return nil
}

Expand Down
120 changes: 79 additions & 41 deletions node/full_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (
"github.com/rollkit/rollkit/config"
"github.com/rollkit/rollkit/conv"
abciconv "github.com/rollkit/rollkit/conv/abci"
mockda "github.com/rollkit/rollkit/da/mock"
"github.com/rollkit/rollkit/mocks"
"github.com/rollkit/rollkit/store"
"github.com/rollkit/rollkit/types"
)

Expand Down Expand Up @@ -648,60 +650,73 @@ func TestBlockchainInfo(t *testing.T) {
}

func TestValidatorSetHandling(t *testing.T) {
// handle multiple sequencers
t.Skip()

assert := assert.New(t)
require := require.New(t)
app := &mocks.Application{}
app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{})
app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{})
app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{})
app.On("Commit", mock.Anything).Return(abci.ResponseCommit{})
app.On("GetAppHash", mock.Anything).Return(abci.ResponseGetAppHash{})
app.On("GenerateFraudProof", mock.Anything).Return(abci.ResponseGenerateFraudProof{})

key, _, _ := crypto.GenerateEd25519Key(crand.Reader)
waitCh := make(chan interface{})

vKeys := make([]tmcrypto.PrivKey, 2)
apps := make([]*mocks.Application, 2)
nodes := make([]*FullNode, 2)

vKeys := make([]tmcrypto.PrivKey, 4)
genesisValidators := make([]tmtypes.GenesisValidator, len(vKeys))
for i := 0; i < len(vKeys); i++ {
vKeys[i] = ed25519.GenPrivKey()
genesisValidators[i] = tmtypes.GenesisValidator{Address: vKeys[i].PubKey().Address(), PubKey: vKeys[i].PubKey(), Power: int64(i + 100), Name: fmt.Sprintf("gen #%d", i)}
apps[i] = createApp(vKeys[0], waitCh, require)
}

nodeKey := &p2p.NodeKey{
PrivKey: vKeys[0],
}
signingKey, _ := conv.GetNodeKey(nodeKey)

pbValKey, err := encoding.PubKeyToProto(vKeys[0].PubKey())
require.NoError(err)

waitCh := make(chan interface{})

app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}).Times(5)
app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{ValidatorUpdates: []abci.ValidatorUpdate{{PubKey: pbValKey, Power: 0}}}).Once()
app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}).Once()
app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{ValidatorUpdates: []abci.ValidatorUpdate{{PubKey: pbValKey, Power: 100}}}).Once()
app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}).Run(func(args mock.Arguments) {
waitCh <- nil
})
dalc := &mockda.DataAvailabilityLayerClient{}
ds, err := store.NewDefaultInMemoryKVStore()
require.Nil(err)
err = dalc.Init([8]byte{}, nil, ds, log.TestingLogger())
require.Nil(err)
err = dalc.Start()
require.Nil(err)

for i := 0; i < len(nodes); i++ {
nodeKey := &p2p.NodeKey{
PrivKey: vKeys[i],
}
signingKey, err := conv.GetNodeKey(nodeKey)
require.NoError(err)
nodes[i], err = newFullNode(
context.Background(),
config.NodeConfig{
DALayer: "mock",
Aggregator: true,
BlockManagerConfig: config.BlockManagerConfig{
BlockTime: 1 * time.Second,
DABlockTime: 100 * time.Millisecond,
},
},
signingKey,
signingKey,
abcicli.NewLocalClient(nil, apps[i]),
&tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators},
log.TestingLogger(),
)
require.NoError(err)
require.NotNil(nodes[i])

node, err := newFullNode(context.Background(), config.NodeConfig{DALayer: "mock", Aggregator: true, BlockManagerConfig: config.BlockManagerConfig{BlockTime: 10 * time.Millisecond}}, key, signingKey, abcicli.NewLocalClient(nil, app), &tmtypes.GenesisDoc{ChainID: "test", Validators: genesisValidators}, log.TestingLogger())
require.NoError(err)
require.NotNil(node)
// use same, common DALC, so nodes can share data
nodes[i].dalc = dalc
nodes[i].blockManager.SetDALC(dalc)
}

rpc := NewFullClient(node)
rpc := NewFullClient(nodes[0])
require.NotNil(rpc)

err = node.Start()
require.NoError(err)
for i := 0; i < len(nodes); i++ {
err := nodes[i].Start()
require.NoError(err)
}

<-waitCh
<-waitCh

// test first blocks
for h := int64(1); h <= 6; h++ {
for h := int64(1); h <= 3; h++ {
vals, err := rpc.Validators(context.Background(), &h, nil, nil)
assert.NoError(err)
assert.NotNil(vals)
Expand All @@ -710,8 +725,8 @@ func TestValidatorSetHandling(t *testing.T) {
assert.EqualValues(vals.BlockHeight, h)
}

// 6th EndBlock removes first validator from the list
for h := int64(7); h <= 8; h++ {
// 3rd EndBlock removes the first validator from the list
for h := int64(4); h <= 5; h++ {
vals, err := rpc.Validators(context.Background(), &h, nil, nil)
assert.NoError(err)
assert.NotNil(vals)
Expand All @@ -720,8 +735,9 @@ func TestValidatorSetHandling(t *testing.T) {
assert.EqualValues(vals.BlockHeight, h)
}

// 8th EndBlock adds validator back
for h := int64(9); h <= 12; h++ {
// 5th EndBlock adds validator back
for h := int64(6); h <= 9; h++ {
<-waitCh
<-waitCh
vals, err := rpc.Validators(context.Background(), &h, nil, nil)
assert.NoError(err)
Expand All @@ -737,7 +753,29 @@ func TestValidatorSetHandling(t *testing.T) {
assert.NotNil(vals)
assert.EqualValues(len(genesisValidators), vals.Total)
assert.Len(vals.Validators, len(genesisValidators))
assert.GreaterOrEqual(vals.BlockHeight, int64(12))
assert.GreaterOrEqual(vals.BlockHeight, int64(9))
}

func createApp(keyToRemove tmcrypto.PrivKey, waitCh chan interface{}, require *require.Assertions) *mocks.Application {
app := &mocks.Application{}
app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{})
app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{})
app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{})
app.On("Commit", mock.Anything).Return(abci.ResponseCommit{})
app.On("GetAppHash", mock.Anything).Return(abci.ResponseGetAppHash{})
app.On("GenerateFraudProof", mock.Anything).Return(abci.ResponseGenerateFraudProof{})

pbValKey, err := encoding.PubKeyToProto(keyToRemove.PubKey())
require.NoError(err)

app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}).Times(2)
app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{ValidatorUpdates: []abci.ValidatorUpdate{{PubKey: pbValKey, Power: 0}}}).Once()
app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}).Once()
app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{ValidatorUpdates: []abci.ValidatorUpdate{{PubKey: pbValKey, Power: 100}}}).Once()
app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}).Run(func(args mock.Arguments) {
waitCh <- nil
})
return app
}

// copy-pasted from store/store_test.go
Expand Down
2 changes: 1 addition & 1 deletion state/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (e *BlockExecutor) updateState(state types.State, block *types.Block, abciR
// for now, we don't care about part set headers
},
NextValidators: nValSet,
Validators: state.NextValidators.Copy(),
Validators: nValSet,
LastValidators: state.Validators.Copy(),
LastHeightValidatorsChanged: lastHeightValSetChanged,
ConsensusParams: state.ConsensusParams,
Expand Down

0 comments on commit a1d058c

Please sign in to comment.