Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed infinity loop sync. #4575

Merged
merged 5 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/service/stagedstreamsync/staged_stream_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,8 +642,9 @@ func (ss *StagedStreamSync) addConsensusLastMile(bc core.BlockChain, cs *consens
case errors.Is(err, core.ErrNotLastBlockInEpoch):
case err != nil:
return errors.Wrap(err, "failed to InsertChain")
default:
sophoah marked this conversation as resolved.
Show resolved Hide resolved
hashes = append(hashes, block.Header().Hash())
}
hashes = append(hashes, block.Header().Hash())
}
return nil
})
Expand Down
8 changes: 6 additions & 2 deletions api/service/stagedstreamsync/syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ func (s *StagedStreamSync) Debug(source string, msg interface{}) {
// For each iteration, estimate the current block number, then fetch block & insert to blockchain
func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bool) (uint64, int, error) {

startedNumber := s.bc.CurrentBlock().NumberU64()

var totalInserted int

s.initSync = initSync
Expand Down Expand Up @@ -249,7 +251,7 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo
for {
ctx, cancel := context.WithCancel(downloaderContext)

n, err := s.doSyncCycle(ctx, initSync)
n, err := s.doSyncCycle(ctx)
if err != nil {
utils.Logger().Error().
Err(err).
Expand Down Expand Up @@ -281,6 +283,8 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo
Bool("isBeacon", s.isBeacon).
Uint32("shard", s.bc.ShardID()).
Int("blocks", totalInserted).
Uint64("startedNumber", startedNumber).
Uint64("currentNumber", s.bc.CurrentBlock().NumberU64()).
Msg(WrapStagedSyncMsg("sync cycle blocks inserted successfully"))
}

Expand All @@ -304,7 +308,7 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo
return estimatedHeight, totalInserted, nil
}

func (s *StagedStreamSync) doSyncCycle(ctx context.Context, initSync bool) (int, error) {
func (s *StagedStreamSync) doSyncCycle(ctx context.Context) (int, error) {

// TODO: initSync=true means currentCycleNumber==0, so we can remove initSync

Expand Down
12 changes: 7 additions & 5 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ const (
AsyncProposal
)

type DownloadAsync interface {
DownloadAsync()
}

// Consensus is the main struct with all states and data related to consensus process.
type Consensus struct {
Decider quorum.Decider
Expand Down Expand Up @@ -122,9 +126,7 @@ type Consensus struct {
// finalityCounter keep tracks of the finality time
finalityCounter atomic.Value //int64

dHelper interface {
DownloadAsync()
}
dHelper DownloadAsync

// Both flags only for initialization state.
start bool
Expand Down Expand Up @@ -190,10 +192,10 @@ func (consensus *Consensus) BlocksSynchronized() {
}

// BlocksNotSynchronized lets the main loop know that block is not synchronized
func (consensus *Consensus) BlocksNotSynchronized() {
func (consensus *Consensus) BlocksNotSynchronized(reason string) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.syncNotReadyChan()
consensus.syncNotReadyChan(reason)
}

// VdfSeedSize returns the number of VRFs for VDF computation
Expand Down
7 changes: 4 additions & 3 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,12 @@ func (consensus *Consensus) syncReadyChan() {
}
}

func (consensus *Consensus) syncNotReadyChan() {
consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan")
func (consensus *Consensus) syncNotReadyChan(reason string) {
mode := consensus.current.Mode()
consensus.setBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1)
consensus.current.SetMode(Syncing)
consensus.getLogger().Info().Msg("[ConsensusMainLoop] Node is OUT OF SYNC")
consensus.getLogger().Info().Msgf("[ConsensusMainLoop] syncNotReadyChan, prev %s, reason %s", mode.String(), reason)
consensus.getLogger().Info().Msgf("[ConsensusMainLoop] Node is OUT OF SYNC, reason: %s", reason)
consensusSyncCounterVec.With(prometheus.Labels{"consensus": "out_of_sync"}).Inc()
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (dh *downloadHelper) downloadStartedLoop(c *Consensus) {
for {
select {
case <-dh.startedCh:
c.BlocksNotSynchronized()
c.BlocksNotSynchronized("downloadStartedLoop")

case err := <-dh.startedSub.Err():
c.GetLogger().Info().Err(err).Msg("consensus download finished loop closed")
Expand Down
2 changes: 1 addition & 1 deletion consensus/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) {
_, err := consensus.ValidateNewBlock(recvMsg)
if err == nil {
consensus.GetLogger().Info().
Msg("[Announce] Block verified")
Msgf("[Announce] Block verified %d", recvMsg.BlockNum)
}
}()
}
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1608,7 +1608,7 @@ func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (i

switch status {
case CanonStatTy:
logger.Info().Msg("Inserted new block")
logger.Info().Msgf("Inserted new block s: %d e: %d n:%d", block.ShardID(), block.Epoch().Uint64(), block.NumberU64())
coalescedLogs = append(coalescedLogs, logs...)
blockInsertTimer.UpdateSince(bstart)
events = append(events, ChainEvent{block, block.Hash(), logs})
Expand Down
3 changes: 2 additions & 1 deletion core/epochchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ func (bc *EpochChain) InsertChain(blocks types.Blocks, _ bool) (int, error) {
se1()
se2()
utils.Logger().Info().
Msgf("[EPOCHSYNC] Added block %d %s", block.NumberU64(), block.Hash().Hex())
Msgf("[EPOCHSYNC] Added block %d, epoch %d, %s", block.NumberU64(), block.Epoch().Uint64(), block.Hash().Hex())

}
return 0, nil
}
Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/accessors_offchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func WriteShardStateBytes(db DatabaseWriter, epoch *big.Int, data []byte) error
}
utils.Logger().Info().
Str("epoch", epoch.String()).
Int("size", len(data)).Msg("wrote sharding state")
Int("size", len(data)).Msgf("wrote sharding state, epoch %d", epoch.Uint64())
return nil
}

Expand Down
4 changes: 0 additions & 4 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,10 +499,6 @@ func (node *Node) validateNodeMessage(ctx context.Context, payload []byte) (
utils.Logger().Debug().Uint64("receivedNum", block.NumberU64()).
Uint64("currentNum", curBeaconHeight).Msg("beacon block sync message rejected")
return nil, 0, errors.New("beacon block height smaller than current height beyond tolerance")
} else if block.NumberU64()-beaconBlockHeightTolerance > curBeaconHeight {
utils.Logger().Debug().Uint64("receivedNum", block.NumberU64()).
Uint64("currentNum", curBeaconHeight).Msg("beacon block sync message rejected")
return nil, 0, errors.Errorf("beacon block height too much higher than current height beyond tolerance, block %d, current %d, epoch %d , current %d", block.NumberU64(), curBeaconHeight, block.Epoch().Uint64(), curBeaconBlock.Epoch().Uint64())
sophoah marked this conversation as resolved.
Show resolved Hide resolved
} else if block.NumberU64() <= curBeaconHeight {
utils.Logger().Debug().Uint64("receivedNum", block.NumberU64()).
Uint64("currentNum", curBeaconHeight).Msg("beacon block sync message ignored")
Expand Down
9 changes: 5 additions & 4 deletions node/node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ func (node *Node) HandleNodeMessage(
if node.Blockchain().ShardID() != shard.BeaconChainShardID {
for _, block := range blocks {
if block.ShardID() == 0 {
utils.Logger().Info().
Msgf("Beacon block being handled by block channel: %d", block.NumberU64())
if block.IsLastBlockInEpoch() {
go func(blk *types.Block) {
node.BeaconBlockChannel <- blk
Expand Down Expand Up @@ -339,7 +337,7 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) error {
}
BroadcastCXReceipts(newBlock, node.Consensus)
} else {
if node.Consensus.Mode() != consensus.Listening {
if mode := node.Consensus.Mode(); mode != consensus.Listening {
numSignatures := node.Consensus.NumSignaturesIncludedInBlock(newBlock)
utils.Logger().Info().
Uint64("blockNum", newBlock.NumberU64()).
Expand All @@ -349,9 +347,12 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) error {
Int("numTxns", len(newBlock.Transactions())).
Int("numStakingTxns", len(newBlock.StakingTransactions())).
Uint32("numSignatures", numSignatures).
Str("mode", mode.String()).
Msg("BINGO !!! Reached Consensus")
if node.Consensus.Mode() == consensus.Syncing {
node.Consensus.SetMode(node.Consensus.UpdateConsensusInformation())
mode = node.Consensus.UpdateConsensusInformation()
utils.Logger().Info().Msgf("Switching to mode %s", mode)
node.Consensus.SetMode(mode)
}

node.Consensus.UpdateValidatorMetrics(float64(numSignatures), float64(newBlock.NumberU64()))
Expand Down
13 changes: 11 additions & 2 deletions node/node_syncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,16 @@ func (node *Node) doBeaconSyncing() {
// If Downloader is not working, we need also deal with blocks from beaconBlockChannel
go func(node *Node) {
// TODO ek – infinite loop; add shutdown/cleanup logic
for _ = range node.BeaconBlockChannel {
for b := range node.BeaconBlockChannel {
if b != nil && b.IsLastBlockInEpoch() {
_, err := node.EpochChain().InsertChain(types.Blocks{b}, true)
if err != nil {
utils.Logger().Error().Err(err).Msgf("[SYNC] InsertChain failed shard: %d epoch:%d number:%d", b.Header().ShardID(), b.Epoch().Uint64(), b.NumberU64())
} else {
utils.Logger().Info().
Msgf("Beacon block being handled by block channel: epoch: %d, number: %d", b.Epoch().Uint64(), b.NumberU64())
}
}
}
}(node)
}
Expand Down Expand Up @@ -307,7 +316,7 @@ func (node *Node) doSync(syncInstance ISync, syncingPeerProvider SyncingPeerProv
if isSynchronized, _, _ := syncInstance.GetParsedSyncStatusDoubleChecked(); !isSynchronized {
node.IsSynchronized.UnSet()
if willJoinConsensus {
consensus.BlocksNotSynchronized()
consensus.BlocksNotSynchronized("node.doSync")
}
isBeacon := bc.ShardID() == shard.BeaconChainShardID
syncInstance.SyncLoop(bc, isBeacon, consensus, legacysync.LoopMinTime)
Expand Down