Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG:Block sync is stuck and cannot be closed #341

Merged
merged 5 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"github.com/Qitmeer/qng/core/blockchain"
"github.com/Qitmeer/qng/core/dbnamespace"
"github.com/Qitmeer/qng/core/event"
"github.com/Qitmeer/qng/core/protocol"
"github.com/Qitmeer/qng/database"
"github.com/Qitmeer/qng/engine/txscript"
"github.com/Qitmeer/qng/log"
"github.com/Qitmeer/qng/meerevm/qit"
"github.com/Qitmeer/qng/node/service"
"github.com/Qitmeer/qng/params"
"github.com/Qitmeer/qng/services/index"
"github.com/Qitmeer/qng/vm"
Expand Down Expand Up @@ -43,7 +46,8 @@ type consensus struct {
blockchain model.BlockChain
indexManager model.IndexManager

vmService *vm.Service
vmService *vm.Service
qitService *qit.QitService
}

// Init initializes consensus
Expand Down Expand Up @@ -84,6 +88,15 @@ func (s *consensus) Init() error {
return err
}
s.vmService = vmService
//
if s.cfg.Qit && params.ActiveNetParams.Net != protocol.MainNet {
ser, err := qit.New(s.cfg, s)
if err != nil {
return err
}
s.qitService = ser
}
//
s.subscribe()
return blockchain.Init()
}
Expand Down Expand Up @@ -144,6 +157,13 @@ func (s *consensus) VMService() model.VMI {
return s.vmService
}

func (s *consensus) QitService() service.IService {
if s.qitService == nil {
return nil
}
return s.qitService
}

func (s *consensus) subscribe() {
//ch := make(chan *event.Event)
//sub := s.events.Subscribe(ch)
Expand Down
2 changes: 2 additions & 0 deletions consensus/model/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/Qitmeer/qng/core/event"
"github.com/Qitmeer/qng/database"
"github.com/Qitmeer/qng/engine/txscript"
"github.com/Qitmeer/qng/node/service"
"github.com/Qitmeer/qng/params"
)

Expand All @@ -26,4 +27,5 @@ type Consensus interface {
Params() *params.Params
VMService() VMI
Rebuild() error
QitService() service.IService
}
12 changes: 5 additions & 7 deletions node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,6 @@ func (qm *QitmeerFull) RegisterAccountService(cfg *config.Config) error {
return nil
}

func (qm *QitmeerFull) RegisterVMService(vmService *vm.Service) error {
return qm.Services().RegisterService(vmService)
}

func (qm *QitmeerFull) RegisterQitSubnet() error {
if !qm.node.Config.Qit ||
params.ActiveNetParams.Net == protocol.MainNet {
Expand Down Expand Up @@ -262,7 +258,7 @@ func newQitmeerFullNode(node *Node) (*QitmeerFull, error) {
// init address api
qm.addressApi = address.NewAddressApi(cfg, node.Params, qm.GetBlockChain())

if err := qm.RegisterVMService(node.consensus.VMService().(*vm.Service)); err != nil {
if err := qm.Services().RegisterService(node.consensus.VMService().(*vm.Service)); err != nil {
return nil, err
}
vms := qm.GetVMService()
Expand All @@ -273,8 +269,10 @@ func newQitmeerFullNode(node *Node) (*QitmeerFull, error) {
return nil, err
}

if err := qm.RegisterQitSubnet(); err != nil {
return nil, err
if qm.node.consensus.QitService() != nil {
if err := qm.Services().RegisterService(qm.node.consensus.QitService()); err != nil {
return nil, err
}
}

if err := qm.RegisterRpcService(); err != nil {
Expand Down
82 changes: 22 additions & 60 deletions p2p/synch/getblockdatas.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
pb "github.com/Qitmeer/qng/p2p/proto/v1"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/peer"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -163,17 +162,20 @@ func (s *Sync) getMerkleBlockDataHandler(ctx context.Context, msg interface{}, s
return nil
}

func (ps *PeerSync) processGetBlockDatas(pe *peers.Peer, blocks []*hash.Hash) error {
func (ps *PeerSync) processGetBlockDatas(pe *peers.Peer, blocks []*hash.Hash) *ProcessResult {
if !ps.isSyncPeer(pe) || !pe.IsConnected() {
err := fmt.Errorf("no sync peer")
log.Trace(err.Error())
return err
err := fmt.Errorf("no sync peer:%v", pe.GetID())
log.Trace(err.Error(), "processID", ps.processID)
return nil
}
blocksReady := []*hash.Hash{}
blockDatas := []*BlockData{}
blockDataM := map[hash.Hash]*BlockData{}

for _, b := range blocks {
if ps.IsInterrupt() {
return nil
}
if ps.sy.p2p.BlockChain().BlockDAG().HasBlock(b) {
continue
}
Expand All @@ -196,28 +198,20 @@ func (ps *PeerSync) processGetBlockDatas(pe *peers.Peer, blocks []*hash.Hash) er
blocksReady = append(blocksReady, b)
}
if len(blockDatas) <= 0 {
ps.continueSync(false)
return nil
}
if !ps.longSyncMod {
bs := ps.sy.p2p.BlockChain().BestSnapshot()
if pe.GraphState().GetTotal() >= bs.GraphState.GetTotal()+MaxBlockLocatorsPerMsg {
ps.longSyncMod = true
}
return &ProcessResult{act: ProcessResultActionContinue, orphan: false}
}
if len(blocksReady) > 0 {
log.Trace(fmt.Sprintf("processGetBlockDatas::sendGetBlockDataRequest peer=%v, blocks=%d [%s -> %s] ", pe.GetID(), len(blocksReady), blocksReady[0], blocksReady[len(blocksReady)-1]))
log.Trace(fmt.Sprintf("processGetBlockDatas::sendGetBlockDataRequest peer=%v, blocks=%d [%s -> %s] ", pe.GetID(), len(blocksReady), blocksReady[0], blocksReady[len(blocksReady)-1]), "processID", ps.processID)
bd, err := ps.sy.sendGetBlockDataRequest(ps.sy.p2p.Context(), pe.GetID(), &pb.GetBlockDatas{Locator: changeHashsToPBHashs(blocksReady)})
if err != nil {
log.Warn(fmt.Sprintf("getBlocks send:%v", err))
go ps.TryAgainUpdateSyncPeer()
return err
log.Warn(fmt.Sprintf("getBlocks send:%v", err), "processID", ps.processID)
return &ProcessResult{act: ProcessResultActionTryAgain}
}
log.Trace(fmt.Sprintf("Received:Locator=%d", len(bd.Locator)))
log.Trace(fmt.Sprintf("Received:Locator=%d", len(bd.Locator)), "processID", ps.processID)
for _, b := range bd.Locator {
block, err := types.NewBlockFromBytes(b.BlockBytes)
if err != nil {
log.Warn(fmt.Sprintf("getBlocks from:%v", err))
log.Warn(fmt.Sprintf("getBlocks from:%v", err), "processID", ps.processID)
break
}
bd, ok := blockDataM[*block.Hash()]
Expand All @@ -231,21 +225,19 @@ func (ps *PeerSync) processGetBlockDatas(pe *peers.Peer, blocks []*hash.Hash) er
add := 0
hasOrphan := false

lastSync := ps.lastSync

for i, b := range blockDatas {
if atomic.LoadInt32(&ps.shutdown) != 0 {
break
if ps.IsInterrupt() {
return nil
}
block := b.Block
if block == nil {
log.Trace(fmt.Sprintf("No block bytes:%d : %s", i, b.Hash.String()))
log.Trace(fmt.Sprintf("No block bytes:%d : %s", i, b.Hash.String()), "processID", ps.processID)
continue
}
//
IsOrphan, _, err := ps.sy.p2p.BlockChain().ProcessBlock(block, behaviorFlags)
if err != nil {
log.Error(fmt.Sprintf("Failed to process block:hash=%s err=%s", block.Hash(), err))
log.Error(fmt.Sprintf("Failed to process block:hash=%s err=%s", block.Hash(), err), "processID", ps.processID)
continue
}
if IsOrphan {
Expand All @@ -257,28 +249,17 @@ func (ps *PeerSync) processGetBlockDatas(pe *peers.Peer, blocks []*hash.Hash) er
add++
ps.lastSync = time.Now()
}
log.Debug(fmt.Sprintf("getBlockDatas:%d/%d spend:%s", add, len(blockDatas), time.Since(lastSync).Truncate(time.Second).String()))
log.Debug(fmt.Sprintf("getBlockDatas:%d/%d", add, len(blockDatas)), "processID", ps.processID)

var err error
if add > 0 {
ps.sy.p2p.TxMemPool().PruneExpiredTx()

if ps.longSyncMod {
if ps.IsCompleteForSyncPeer() {
log.Info("Your synchronization has been completed.")
ps.longSyncMod = false
}

if ps.IsCurrent() {
log.Info("You're up to date now.")
ps.longSyncMod = false
}
}
} else {
err = fmt.Errorf("no get blocks")
log.Debug(err.Error(), "processID", ps.processID)
return &ProcessResult{act: ProcessResultActionTryAgain}
}
ps.continueSync(hasOrphan)
return err
return &ProcessResult{act: ProcessResultActionContinue, orphan: hasOrphan, add: add}
}

func (ps *PeerSync) processGetMerkleBlockDatas(pe *peers.Peer, blocks []*hash.Hash) error {
Expand Down Expand Up @@ -306,12 +287,6 @@ func (ps *PeerSync) processGetMerkleBlockDatas(pe *peers.Peer, blocks []*hash.Ha
if len(blocksReady) <= 0 {
return nil
}
if !ps.longSyncMod {
bs := ps.sy.p2p.BlockChain().BestSnapshot()
if pe.GraphState().GetTotal() >= bs.GraphState.GetTotal()+MaxBlockLocatorsPerMsg {
ps.longSyncMod = true
}
}

bd, err := ps.sy.sendGetMerkleBlockDataRequest(ps.sy.p2p.Context(), pe.GetID(), &pb.MerkleBlockRequest{Hashes: changeHashsToPBHashs(blocksReady)})
if err != nil {
Expand All @@ -322,15 +297,6 @@ func (ps *PeerSync) processGetMerkleBlockDatas(pe *peers.Peer, blocks []*hash.Ha
return nil
}

func (ps *PeerSync) GetBlockDatas(pe *peers.Peer, blocks []*hash.Hash) {
// Ignore if we are shutting down.
if atomic.LoadInt32(&ps.shutdown) != 0 {
return
}

ps.msgChan <- &GetBlockDatasMsg{pe: pe, blocks: blocks}
}

// handleGetData is invoked when a peer receives a getdata qitmeer message and
// is used to deliver block and transaction information.
func (ps *PeerSync) OnGetData(sp *peers.Peer, invList []*pb.InvVect) error {
Expand Down Expand Up @@ -360,11 +326,7 @@ func (ps *PeerSync) OnGetData(sp *peers.Peer, invList []*pb.InvVect) error {
}
}
if len(blocks) > 0 {
err := ps.processGetBlockDatas(sp, changePBHashsToHashs(blocks))
if err != nil {
log.Info("processGetBlockDatas Error", "err", err.Error())
return err
}
ps.processGetBlockDatas(sp, changePBHashsToHashs(blocks))
}
if len(merkleBlocks) > 0 {
err := ps.processGetMerkleBlockDatas(sp, changePBHashsToHashs(merkleBlocks))
Expand Down
34 changes: 7 additions & 27 deletions p2p/synch/getblocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
pb "github.com/Qitmeer/qng/p2p/proto/v1"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/peer"
"sync/atomic"
)

func (s *Sync) sendGetBlocksRequest(ctx context.Context, id peer.ID, blocks *pb.GetBlocks) (*pb.DagBlocks, error) {
Expand Down Expand Up @@ -65,39 +64,20 @@ func (s *Sync) getBlocksHandler(ctx context.Context, msg interface{}, stream lib
return nil
}

func (ps *PeerSync) processGetBlocks(pe *peers.Peer, blocks []*hash.Hash) error {
if len(blocks) <= 0 {
return fmt.Errorf("no blocks")
}
if !ps.isSyncPeer(pe) || !pe.IsConnected() {
return fmt.Errorf("no sync peer")
}

func (ps *PeerSync) processGetBlocks(pe *peers.Peer, blocks []*hash.Hash) *ProcessResult {
db, err := ps.sy.sendGetBlocksRequest(ps.sy.p2p.Context(), pe.GetID(), &pb.GetBlocks{Locator: changeHashsToPBHashs(blocks)})
if err != nil {
return err
log.Warn(err.Error(), "processID", ps.processID)
return nil
}
if len(db.Blocks) <= 0 {
log.Warn("no block need to get")
log.Warn("no block need to get", "processID", ps.processID)
return nil
}
go ps.GetBlockDatas(pe, changePBHashsToHashs(db.Blocks))
return err
}

func (ps *PeerSync) GetBlocks(pe *peers.Peer, blocks []*hash.Hash) {
if pe == nil {
return
}
// Ignore if we are shutting down.
if atomic.LoadInt32(&ps.shutdown) != 0 {
return
}
if len(blocks) == 1 {
ps.GetBlockDatas(pe, blocks)
return
if ps.IsInterrupt() {
return nil
}
ps.msgChan <- &GetBlocksMsg{pe: pe, blocks: blocks}
return ps.processGetBlockDatas(pe, changePBHashsToHashs(db.Blocks))
}

func (s *Sync) GetDataHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) *common.Error {
Expand Down
6 changes: 4 additions & 2 deletions p2p/synch/graphstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ func (s *Sync) graphStateHandler(ctx context.Context, msg interface{}, stream li
func (ps *PeerSync) processUpdateGraphState(pe *peers.Peer) error {
if !pe.IsConnected() {
err := fmt.Errorf("peer is not active")
log.Trace(err.Error())
log.Warn(err.Error())
return err
}
log.Trace(fmt.Sprintf("UpdateGraphState recevied from %v, state=%v ", pe.GetID(), pe.GraphState()))

gs, err := ps.sy.sendGraphStateRequest(ps.sy.p2p.Context(), pe, ps.sy.getGraphState())
if err != nil {
log.Warn(err.Error())
Expand All @@ -92,6 +94,6 @@ func (ps *PeerSync) UpdateGraphState(pe *peers.Peer) {
return
}
pe.RunRate(UpdateGraphState, UpdateGraphStateTime, func() {
ps.msgChan <- &UpdateGraphStateMsg{pe: pe}
ps.processUpdateGraphState(pe)
})
}
3 changes: 1 addition & 2 deletions p2p/synch/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ func (s *Sync) HandlerMemPool(ctx context.Context, msg interface{}, stream libp2
if mpr.TxsNum == curCount || curCount == 0 {
return nil
}
s.peerSync.msgChan <- &OnMsgMemPool{pe: pe, data: &MsgMemPool{}}

go s.peerSync.OnMemPool(pe, &MsgMemPool{})
return nil
}

Expand Down
Loading