Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release Candidate 2023.2.5 (dev -> main) #4470

Merged
merged 16 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/service/stagedstreamsync/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type syncProtocol interface {
GetRawBlocksByNumber(ctx context.Context, bns []uint64, opts ...syncproto.Option) ([][]byte, [][]byte, sttypes.StreamID, error)
GetBlockHashes(ctx context.Context, bns []uint64, opts ...syncproto.Option) ([]common.Hash, sttypes.StreamID, error)
GetBlocksByHashes(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) ([]*types.Block, sttypes.StreamID, error)
GetReceipts(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (receipts []types.Receipts, stid sttypes.StreamID, err error)
GetNodeData(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (data [][]byte, stid sttypes.StreamID, err error)

RemoveStream(stID sttypes.StreamID) // If a stream delivers invalid data, remove the stream
StreamFailed(stID sttypes.StreamID, reason string)
Expand Down
2 changes: 1 addition & 1 deletion consensus/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (consensus *Consensus) isRightBlockNumAndViewID(recvMsg *FBFTMessage) bool
}

func (consensus *Consensus) onAnnounceSanityChecks(recvMsg *FBFTMessage) bool {
logMsgs := consensus.FBFTLog.GetMessagesByTypeSeqView(
logMsgs := consensus.fBFTLog.GetMessagesByTypeSeqView(
msg_pb.MessageType_ANNOUNCE, recvMsg.BlockNum, recvMsg.ViewID,
)
if len(logMsgs) > 0 {
Expand Down
43 changes: 25 additions & 18 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type VerifyBlockFunc func(*types.Block) error
type Consensus struct {
Decider quorum.Decider
// FBFTLog stores the pbft messages and blocks during FBFT process
FBFTLog *FBFTLog
fBFTLog *FBFTLog
// phase: different phase of FBFT protocol: pre-prepare, prepare, commit, finish etc
phase FBFTPhase
// current indicates what state a node is in
Expand Down Expand Up @@ -84,7 +84,7 @@ type Consensus struct {
// IgnoreViewIDCheck determines whether to ignore viewID check
IgnoreViewIDCheck *abool.AtomicBool
// consensus mutex
mutex sync.RWMutex
mutex *sync.RWMutex
// ViewChange struct
vc *viewChange
// Signal channel for proposing a new block and start new consensus
Expand Down Expand Up @@ -140,6 +140,13 @@ func (consensus *Consensus) Blockchain() core.BlockChain {
return consensus.registry.GetBlockchain()
}

func (consensus *Consensus) FBFTLog() FBFT {
return threadsafeFBFTLog{
log: consensus.fBFTLog,
mu: consensus.mutex,
}
}

// ChainReader returns the chain reader.
// This is mostly the same as Blockchain, but it returns only read methods, so we assume it's safe for concurrent use.
func (consensus *Consensus) ChainReader() engine.ChainReader {
Expand All @@ -165,11 +172,11 @@ func (consensus *Consensus) Beaconchain() core.BlockChain {

// VerifyBlock is a function used to verify the block and keep trace of verified blocks.
func (consensus *Consensus) verifyBlock(block *types.Block) error {
if !consensus.FBFTLog.IsBlockVerified(block.Hash()) {
if !consensus.fBFTLog.IsBlockVerified(block.Hash()) {
if err := consensus.BlockVerifier(block); err != nil {
return errors.Errorf("Block verification failed: %s", err)
}
consensus.FBFTLog.MarkBlockVerified(block)
consensus.fBFTLog.MarkBlockVerified(block)
}
return nil
}
Expand Down Expand Up @@ -261,21 +268,21 @@ func New(
Decider quorum.Decider, minPeers int, aggregateSig bool,
) (*Consensus, error) {
consensus := Consensus{
ShardID: shard,
mutex: &sync.RWMutex{},
ShardID: shard,
fBFTLog: NewFBFTLog(),
phase: FBFTAnnounce,
current: State{mode: Normal},
Decider: Decider,
registry: registry,
MinPeers: minPeers,
AggregateSig: aggregateSig,
host: host,
msgSender: NewMessageSender(host),
BlockNumLowChan: make(chan struct{}, 1),
// FBFT timeout
consensusTimeout: createTimeout(),
}
consensus.Decider = Decider
consensus.registry = registry
consensus.MinPeers = minPeers
consensus.AggregateSig = aggregateSig
consensus.host = host
consensus.msgSender = NewMessageSender(host)
consensus.BlockNumLowChan = make(chan struct{}, 1)
// FBFT related
consensus.FBFTLog = NewFBFTLog()
consensus.phase = FBFTAnnounce
consensus.current = State{mode: Normal}
// FBFT timeout
consensus.consensusTimeout = createTimeout()

if multiBLSPriKey != nil {
consensus.priKey = multiBLSPriKey
Expand Down
15 changes: 6 additions & 9 deletions consensus/consensus_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ func (consensus *Consensus) updateBitmaps() {
Str("MessageType", consensus.phase.String()).
Msg("[UpdateBitmaps] Updating consensus bitmaps")
members := consensus.Decider.Participants()
prepareBitmap, _ := bls_cosi.NewMask(members, nil)
commitBitmap, _ := bls_cosi.NewMask(members, nil)
multiSigBitmap, _ := bls_cosi.NewMask(members, nil)
prepareBitmap := bls_cosi.NewMask(members)
commitBitmap := bls_cosi.NewMask(members)
multiSigBitmap := bls_cosi.NewMask(members)
consensus.prepareBitmap = prepareBitmap
consensus.commitBitmap = commitBitmap
consensus.multiSigBitmap = multiSigBitmap
Expand Down Expand Up @@ -575,7 +575,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
copy(blockHash[:], payload[:32])

// Leader sign and add commit message
block := consensus.FBFTLog.GetBlockByHash(blockHash)
block := consensus.fBFTLog.GetBlockByHash(blockHash)
if block == nil {
return errGetPreparedBlock
}
Expand Down Expand Up @@ -627,11 +627,8 @@ func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uin
count := uint32(0)
members := consensus.Decider.Participants()
// TODO(audit): do not reconstruct the Mask
mask, err := bls.NewMask(members, nil)
if err != nil {
return count
}
err = mask.SetMask(block.Header().LastCommitBitmap())
mask := bls.NewMask(members)
err := mask.SetMask(block.Header().LastCommitBitmap())
if err != nil {
return count
}
Expand Down
3 changes: 1 addition & 2 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func TestConsensusInitialization(t *testing.T) {
assert.NoError(t, err)

messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec}
fbtLog := NewFBFTLog()
state := State{mode: Normal}

timeouts := createTimeout()
Expand All @@ -37,7 +36,7 @@ func TestConsensusInitialization(t *testing.T) {
assert.IsType(t, make(chan struct{}), consensus.BlockNumLowChan)

// FBFTLog
assert.Equal(t, fbtLog, consensus.FBFTLog)
assert.NotNil(t, consensus.FBFTLog())

assert.Equal(t, FBFTAnnounce, consensus.phase)

Expand Down
33 changes: 13 additions & 20 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ func (consensus *Consensus) finalCommit() {
network.Bytes,
network.FBFTMsg
commitSigAndBitmap := FBFTMsg.Payload
consensus.FBFTLog.AddVerifiedMessage(FBFTMsg)
consensus.fBFTLog.AddVerifiedMessage(FBFTMsg)
// find correct block content
curBlockHash := consensus.blockHash
block := consensus.FBFTLog.GetBlockByHash(curBlockHash)
block := consensus.fBFTLog.GetBlockByHash(curBlockHash)
if block == nil {
consensus.getLogger().Warn().
Str("blockHash", hex.EncodeToString(curBlockHash[:])).
Expand Down Expand Up @@ -278,7 +278,7 @@ func (consensus *Consensus) BlockCommitSigs(blockNum uint64) ([]byte, error) {
lastCommits, err := consensus.Blockchain().ReadCommitSig(blockNum)
if err != nil ||
len(lastCommits) < bls.BLSSignatureSizeInBytes {
msgs := consensus.FBFTLog.GetMessagesByTypeSeq(
msgs := consensus.FBFTLog().GetMessagesByTypeSeq(
msg_pb.MessageType_COMMITTED, blockNum,
)
if len(msgs) != 1 {
Expand Down Expand Up @@ -482,7 +482,7 @@ func (consensus *Consensus) getLastMileBlockIter(bnStart uint64, cb func(iter *L
}
return cb(&LastMileBlockIter{
blockCandidates: blocks,
fbftLog: consensus.FBFTLog,
fbftLog: consensus.fBFTLog,
verify: consensus.BlockVerifier,
curIndex: 0,
logger: consensus.getLogger(),
Expand Down Expand Up @@ -513,7 +513,7 @@ func (consensus *Consensus) getLastMileBlocksAndMsg(bnStart uint64) ([]*types.Bl
msgs []*FBFTMessage
)
for blockNum := bnStart; ; blockNum++ {
blk, msg, err := consensus.FBFTLog.GetCommittedBlockAndMsgsFromNumber(blockNum, consensus.getLogger())
blk, msg, err := consensus.fBFTLog.GetCommittedBlockAndMsgsFromNumber(blockNum, consensus.getLogger())
if err != nil {
if err == errFBFTLogNotFound {
break
Expand Down Expand Up @@ -551,7 +551,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
network.Bytes,
network.FBFTMsg
bareMinimumCommit := FBFTMsg.Payload
consensus.FBFTLog.AddVerifiedMessage(FBFTMsg)
consensus.fBFTLog.AddVerifiedMessage(FBFTMsg)

if err := consensus.verifyLastCommitSig(bareMinimumCommit, blk); err != nil {
return errors.Wrap(err, "[preCommitAndPropose] failed verifying last commit sig")
Expand All @@ -567,16 +567,16 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
},
p2p.ConstructMessage(msgToSend)); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message")
consensus.GetLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message")
} else {
consensus.getLogger().Info().
consensus.GetLogger().Info().
Str("blockHash", blk.Hash().Hex()).
Uint64("blockNum", consensus.BlockNum()).
Hex("lastCommitSig", bareMinimumCommit).
Msg("[preCommitAndPropose] Sent Committed Message")
}

if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil {
if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog().IsBlockVerified(blk.Hash())); err != nil {
consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain")
return
}
Expand Down Expand Up @@ -661,7 +661,7 @@ func (consensus *Consensus) tryCatchup() error {

func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error {
if consensus.Blockchain().CurrentBlock().NumberU64() < blk.NumberU64() {
if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil {
if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.fBFTLog.IsBlockVerified(blk.Hash())); err != nil {
consensus.getLogger().Error().Err(err).Msg("[commitBlock] Failed to add block to chain")
return err
}
Expand Down Expand Up @@ -785,7 +785,7 @@ func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg
if blk.IsLastBlockInEpoch() {
consensus.setMode(consensus.updateConsensusInformation())
}
consensus.FBFTLog.PruneCacheBeforeBlock(blk.NumberU64())
consensus.fBFTLog.PruneCacheBeforeBlock(blk.NumberU64())
consensus.resetState()
}

Expand Down Expand Up @@ -920,19 +920,12 @@ func (consensus *Consensus) ValidateVdfAndProof(headerObj *block.Header) bool {
func (consensus *Consensus) DeleteBlocksLessThan(number uint64) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.FBFTLog.deleteBlocksLessThan(number)
consensus.fBFTLog.deleteBlocksLessThan(number)
}

// DeleteMessagesLessThan deletes messages less than given block number.
func (consensus *Consensus) DeleteMessagesLessThan(number uint64) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.FBFTLog.deleteMessagesLessThan(number)
}

// DeleteBlockByNumber deletes block by given block number.
func (consensus *Consensus) DeleteBlockByNumber(number uint64) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.FBFTLog.deleteBlockByNumber(number)
consensus.fBFTLog.deleteMessagesLessThan(number)
}
6 changes: 1 addition & 5 deletions consensus/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,7 @@ func (consensus *Consensus) construct(
)
} else {
// TODO: use a persistent bitmap to report bitmap
mask, err := bls.NewMask(consensus.Decider.Participants(), nil)
if err != nil {
utils.Logger().Warn().Err(err).Msg("unable to setup mask for multi-sig message")
return nil, err
}
mask := bls.NewMask(consensus.Decider.Participants())
for _, key := range priKeys {
mask.SetKey(key.Pub.Bytes, true)
}
Expand Down
6 changes: 3 additions & 3 deletions consensus/double_sign.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (consensus *Consensus) checkDoubleSign(recvMsg *FBFTMessage) bool {
); alreadyCastBallot != nil {
for _, pubKey1 := range alreadyCastBallot.SignerPubKeys {
if bytes.Compare(pubKey2.Bytes[:], pubKey1[:]) == 0 {
for _, blk := range consensus.FBFTLog.GetBlocksByNumber(recvMsg.BlockNum) {
for _, blk := range consensus.fBFTLog.GetBlocksByNumber(recvMsg.BlockNum) {
firstSignedHeader := blk.Header()
areHeightsEqual := firstSignedHeader.Number().Uint64() == recvMsg.BlockNum
areViewIDsEqual := firstSignedHeader.ViewID().Uint64() == recvMsg.ViewID
Expand Down Expand Up @@ -138,8 +138,8 @@ func (consensus *Consensus) couldThisBeADoubleSigner(
recvMsg *FBFTMessage,
) bool {
num, hash := consensus.BlockNum(), recvMsg.BlockHash
suspicious := !consensus.FBFTLog.HasMatchingAnnounce(num, hash) ||
!consensus.FBFTLog.HasMatchingPrepared(num, hash)
suspicious := !consensus.fBFTLog.HasMatchingAnnounce(num, hash) ||
!consensus.fBFTLog.HasMatchingPrepared(num, hash)
if suspicious {
consensus.getLogger().Debug().
Str("message", recvMsg.String()).
Expand Down
21 changes: 21 additions & 0 deletions consensus/engine/consensus_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@ type ChainReader interface {
// Config retrieves the blockchain's chain configuration.
Config() *params.ChainConfig

// TrieNode retrieves a blob of data associated with a trie node
// either from ephemeral in-memory cache, or from persistent storage.
TrieNode(hash common.Hash) ([]byte, error)

// ContractCode retrieves a blob of data associated with a contract
// hash either from ephemeral in-memory cache, or from persistent storage.
//
// If the code doesn't exist in the in-memory cache, check the storage with
// new code scheme.
ContractCode(hash common.Hash) ([]byte, error)

// ValidatorCode retrieves a blob of data associated with a validator
// hash either from ephemeral in-memory cache, or from persistent storage.
//
// If the code doesn't exist in the in-memory cache, check the storage with
// new code scheme.
ValidatorCode(hash common.Hash) ([]byte, error)

// GetReceiptsByHash retrieves the receipts for all transactions in a given block.
GetReceiptsByHash(hash common.Hash) types.Receipts

// CurrentHeader retrieves the current header from the local chain.
CurrentHeader() *block.Header

Expand Down
Loading