diff --git a/cmd/arb b/cmd/arb index 3c68b9f56b..6706c44718 160000 --- a/cmd/arb +++ b/cmd/arb @@ -1 +1 @@ -Subproject commit 3c68b9f56b04a7cbbc370caf939389cd42ff3af5 +Subproject commit 6706c447187268524e93e873d5c8b197bb0d5606 diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index 51869128dd..2c2cb70898 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -604,9 +604,9 @@ func (f *BlockFetcher) loop() { // Split the batch of headers into unknown ones (to return to the caller), // known incomplete ones (requiring body retrievals) and completed blocks. unknown, incomplete, complete, lightHeaders := []*types.Header{}, []*blockAnnounce{}, []*types.Block{}, []*blockAnnounce{} - for _, header := range task.headers { + for _, header := range task.headers { //新区块从Peer处下载成功 hash := header.Hash() - + //log.Info("===> Block handleBlockAnnounces Request Arrive", "Header Num", header.Number, "Validator", header.Coinbase.String(), "Peer", task.peer) // Filter fetcher-requested headers from other synchronisation algorithms if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil { // If the delivered header does not match the promised number, drop the announcer @@ -801,6 +801,8 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B } else { hash, number = block.Hash(), block.NumberU64() } + //sylarChange + eth.GetPeerManagerInstance().ReceiveNewBlockMsg(peer, block.NumberU64(), block.Header().Coinbase.String(), block) // Ensure the peer isn't DOSing us count := f.queues[peer] + 1 if count > blockLimit { diff --git a/eth/handler.go b/eth/handler.go index 028a16b7df..623e0dcf93 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -155,7 +155,6 @@ type handler struct { BaseHash common.Hash directCh chan []*types.Transaction BlockBroadcastInterval int - blockBroadcastTimer *time.Timer //sylarChange //end } @@ -170,7 +169,7 @@ func newHandler(config *handlerConfig) (*handler, error) { } //sylarChange - blockBroadcastTimer := time.NewTimer(time.Duration(config.BlockBroadcastInterval) * time.Second) + eth.GetPeerManagerInstance().CreateBlockInfoBroadcastScheduler(config.BlockBroadcastInterval) h := &handler{ networkID: config.Network, @@ -192,7 +191,6 @@ func newHandler(config *handlerConfig) (*handler, error) { BaseHash: config.BaseHash, BlockBroadcastInterval: config.BlockBroadcastInterval, directCh: make(chan []*types.Transaction, 1000), - blockBroadcastTimer: blockBroadcastTimer, //sylarChange //end } if config.Sync == downloader.FullSync { @@ -316,8 +314,7 @@ func newHandler(config *handlerConfig) (*handler, error) { } return 0, nil } - //sylarChange - h.broadcastPeerBlockInfo(config, blocks) + n, err := h.chain.InsertChain(blocks) if err == nil { atomic.StoreUint32(&h.acceptTxs, 1) // Mark initial sync done on any fetcher import @@ -337,7 +334,7 @@ func newHandler(config *handlerConfig) (*handler, error) { h.chainSync = newChainSyncer(h) //sylarChange eth.GetPeerFilter().Register(h.removePeer) - + eth.GetPeerManagerInstance().RegisterBroadcastBlockHandlerFunc(h.BroadcastBlock) return h, nil } @@ -414,7 +411,6 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { return err } defer h.unregisterPeer(peer.ID()) - p := h.peers.peer(peer.ID()) if p == nil { return errors.New("peer dropped during handling") @@ -537,7 +533,10 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { //sylarChange eth.GetPeerFilter().NotifyWithPeer(peer, 0) - + eth.GetPeerManagerInstance().RegisterPeer(peer) + defer func() { + eth.GetPeerManagerInstance().UnregisterPeer(peer.ID()) + }() // Handle incoming messages until the connection is torn down return handler(peer) } @@ -868,26 +867,3 @@ func (h *handler) txBroadcastLoopDirect() { } } } - -// sylarChange // 定时x秒给Peer发送Block信息防止掉线 -func (h *handler) broadcastPeerBlockInfo(config *handlerConfig, blocks types.Blocks) { //sylarChange-warning 如果Timer到了时间只需要遍历一次blocks,发送Peer信息 && 记录blocksNum,省去一次遍历时间消耗 - var hasBroadcastToPeer bool = false - select { - case <-h.blockBroadcastTimer.C: - log.Info("🍻🍻🍻 Sylar Log:定时器开始发送Block信息到Peer") - hasBroadcastToPeer = true - for i, _ := range blocks { - log.Debug("blockBroadcastTimer arrive", "BlockBroadcastInterval", config.BlockBroadcastInterval) - h.BroadcastBlock(blocks[i], true) - eth.GetPeerFilter().PutNumber(blocks[i].NumberU64(), blocks[i].Difficulty()) - } - h.blockBroadcastTimer.Reset(time.Duration(config.BlockBroadcastInterval) * time.Second) - default: - log.Debug("blockBroadcastTimer not arrive", "number", blocks[0].Number()) - } - if hasBroadcastToPeer == false { - for i, _ := range blocks { - eth.GetPeerFilter().PutNumber(blocks[i].NumberU64(), blocks[i].Difficulty()) - } - } -} diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 43c65d48a9..ea5391c3ea 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -59,7 +59,7 @@ func (h *ethHandler) AcceptTxs() bool { // Handle is invoked from a peer's message handler when it receives a new remote // message that the handler couldn't consume and serve itself. -func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { +func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { //远程Peer发送消息处处理地方 // Consume any broadcasts and announces, forwarding the rest to the downloader switch packet := packet.(type) { case *eth.NewBlockHashesPacket: @@ -121,11 +121,10 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash, // handleBlockBroadcast is invoked from a peer's message handler when it transmits a // block broadcast for the local node to process. -func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td *big.Int) error { - +func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td *big.Int) error { //新区块消息 //sylarChange eth.GetPeerFilter().PutTotal(block.NumberU64(), td) - + //log.Info("===> handleBlockBroadcast", "BlockNum", block.NumberU64(), "Peer", peer.ID(), "Validator", block.Header().Coinbase.String()) // Drop all incoming block announces from the p2p network if // the chain already entered the pos stage and disconnect the // remote peer. diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 0f6a0c0a87..b70b9fa7a8 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -212,6 +212,13 @@ func handleMessage(backend Backend, peer *Peer) error { metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(time.Since(start).Microseconds()) }(time.Now()) } + if msg.Code == TransactionsMsg || msg.Code == PooledTransactionsMsg { + GetPeerManagerInstance().BumpTxEventTimes(&PeerNodeInfo{ + ID: peer.ID(), + Address: peer.RemoteAddr().String(), + PeerID: peer.Node().URLv4(), + }) + } if handler := handlers[msg.Code]; handler != nil { return handler(backend, msg, peer) } diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index ea91edea18..4fa1ad89cd 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -514,9 +514,11 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error { incomingTxInfo := &Global.IncomingTxChannelInfoModel{ Txs: txs, PeerEncodeID: peer.Peer.Node().URLv4(), + ID: peer.Peer.ID().String(), + Address: peer.Peer.RemoteAddr().String(), } IncomingTx <- incomingTxInfo - log.Debug("🍻🍻🍻 接收到新TX", "addr", peer.RemoteAddr(), "id", peer.ID()) + //log.Debug("🍻🍻🍻 接收到新TX", "addr", peer.RemoteAddr(), "id", peer.ID()) } for i, tx := range txs { // Validate and mark the remote transaction @@ -545,9 +547,11 @@ func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error incomingTxInfo := &Global.IncomingTxChannelInfoModel{ Txs: txs.PooledTransactionsPacket, PeerEncodeID: peer.Peer.Node().URLv4(), + ID: peer.Peer.ID().String(), + Address: peer.Peer.RemoteAddr().String(), } IncomingTx <- incomingTxInfo - log.Debug("🍻🍻🍻 接收到新TX", "addr", peer.RemoteAddr(), "id", peer.ID()) + //log.Debug("🍻🍻🍻 接收到新TX", "addr", peer.RemoteAddr(), "id", peer.ID()) } for i, tx := range txs.PooledTransactionsPacket { // Validate and mark the remote transaction diff --git a/eth/protocols/eth/peer_filter.go b/eth/protocols/eth/peer_filter.go index 77689b35b8..54fe7a74a8 100644 --- a/eth/protocols/eth/peer_filter.go +++ b/eth/protocols/eth/peer_filter.go @@ -73,8 +73,7 @@ func (pF *PeerFilter) NotifyWithPeer(peer *Peer, number uint64) { default: log.Error("PeerFilter Notify block") } - //记录Peer到本地 - GetPeerManagerInstance().savePeerInfo(peer) + } func (pF *PeerFilter) Start() { @@ -127,6 +126,11 @@ func (pF *PeerFilter) loop() { } } } + +func (pF *PeerFilter) AddStaticPeer(peerID string) { + pF.excludePeers[peerID] = struct{}{} +} + func (pF *PeerFilter) SetP2PServer(p2pServer *p2p.Server) { pF.p2pServer = p2pServer } diff --git a/eth/protocols/eth/peer_manager.go b/eth/protocols/eth/peer_manager.go index 3f7c33d854..8b09fe4715 100644 --- a/eth/protocols/eth/peer_manager.go +++ b/eth/protocols/eth/peer_manager.go @@ -1,38 +1,294 @@ package eth import ( - "errors" + "github.com/ethereum/go-ethereum/cmd/arb/Global" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/go-co-op/gocron" "sync" + "sync/atomic" + "time" ) -type peerManager struct { +type PeerNodeInfo struct { + ID string + Address string + PeerID string //EncodeID +} + +type PeerEventInfo struct { + Last time.Time + EventTimes int64 + Address string + PeerID string +} + +type PeerManager struct { //nodeID:encodeID - peers map[string]string // all peers , including removed peer + peers map[string]*PeerNodeInfo // all peers + lock sync.RWMutex + blockInfoLock sync.RWMutex + peerEventCal map[string]*PeerEventInfo + peerBlockCal map[string]map[string]int //{validator:{encode:times}} + peerBlockTimeCal map[string]int //val-encode:times + peersExistAllTimes map[string]*PeerNodeInfo //所有Peer,包括移除的 + firstTimeTrigger bool + LastCheckTime time.Time + txReceiveTimes int64 + currentBlock *types.Block + broadcastBlockFunc func(block *types.Block, propagate bool) + blockInfoRecordLastTime time.Time } -var instance *peerManager +var instance *PeerManager var once sync.Once -func GetPeerManagerInstance() *peerManager { +func GetPeerManagerInstance() *PeerManager { once.Do(func() { - instance = &peerManager{ - peers: make(map[string]string), + instance = &PeerManager{ + peers: make(map[string]*PeerNodeInfo), + peersExistAllTimes: make(map[string]*PeerNodeInfo), + peerEventCal: make(map[string]*PeerEventInfo), + peerBlockCal: make(map[string]map[string]int), + peerBlockTimeCal: make(map[string]int), } }) return instance } -func (p *peerManager) savePeerInfo(peer *Peer) { - //if _, foundedPeer := p.peers[peer.ID()]; !foundedPeer { - // //储存Peer信息 - // p.peers[peer.ID()] = peer.Peer.Node().URLv4() - // log.Debug("Save Peer Info", peer.Peer.Node().URLv4()) +func (manager *PeerManager) RegisterPeer(peer *Peer) { + manager.lock.Lock() + defer manager.lock.Unlock() + id := peer.ID() + peerInfo := &PeerNodeInfo{ + ID: peer.Peer.ID().String(), + Address: peer.Peer.RemoteAddr().String(), + PeerID: peer.Peer.Node().URLv4(), + } + manager.peersExistAllTimes[id] = peerInfo + if _, ok := manager.peers[id]; ok { + return + } + manager.peers[id] = peerInfo +} + +func (manager *PeerManager) UnregisterPeer(id string) { + manager.lock.Lock() + defer manager.lock.Unlock() + _, ok := manager.peers[id] + _, ok2 := manager.peerEventCal[id] + if ok2 { + delete(manager.peerEventCal, id) + } + if !ok { + return + } + delete(manager.peers, id) +} + +func (manager *PeerManager) BumpTxEventTimes(peerInfo *PeerNodeInfo) { + manager.lock.Lock() + defer manager.lock.Unlock() + if manager.firstTimeTrigger == false { + manager.firstTimeTrigger = true + manager.checkEvent() + } + nowTime := time.Now() + if manager.LastCheckTime.IsZero() { + manager.LastCheckTime = nowTime + } + atomic.AddInt64(&manager.txReceiveTimes, 1) + if info, ok := manager.peerEventCal[peerInfo.ID]; !ok { + manager.peerEventCal[peerInfo.ID] = &PeerEventInfo{ + Last: nowTime, + EventTimes: 1, + Address: peerInfo.Address, + PeerID: peerInfo.PeerID, + } + } else { + info.EventTimes += 1 + info.Last = nowTime + } +} + +func (manager *PeerManager) checkEvent() { + //timezone, _ := time.LoadLocation("Asia/Shanghai") + //s := gocron.NewScheduler(timezone) + //s.Every(20).Seconds().Do(func() { + // manager.lock.Lock() + // defer manager.lock.Unlock() + // fmt.Println("Event Cal:") + // fmt.Printf("Peer Count:%d, Event Cal Count:%d\n", len(manager.peers), len(manager.peerEventCal)) + // for _, eventInfo := range manager.peerEventCal { + // formatTimeStr := Global.GetStringTime(eventInfo.Last) + // fmt.Printf("Peer IP:%s,Tx Event Times:%d,Last Time:%s\n", eventInfo.Address, eventInfo.EventTimes, formatTimeStr) + // } + // for _, peer := range manager.peers { + // if _, ok := manager.peerEventCal[peer.ID]; !ok { + // fmt.Printf("Peer Not In Event Cal , IP:%s\n", peer.Address) + // } + // } + //}) + //s.StartAsync() +} + +func (manager *PeerManager) PeersAll() []*PeerNodeInfo { + manager.lock.RLock() + defer manager.lock.RUnlock() + list := make([]*PeerNodeInfo, 0, len(manager.peers)) + for _, p := range manager.peers { + list = append(list, p) + } + return list +} + +func (manager *PeerManager) PeerTxEventsCounter() []*PeerEventInfo { + manager.lock.RLock() + defer manager.lock.RUnlock() + list := make([]*PeerEventInfo, 0, len(manager.peerEventCal)) + for _, p := range manager.peerEventCal { + list = append(list, p) + } + return list +} + +func (manager *PeerManager) PeerTxEventsCounterMap() map[string]*PeerEventInfo { + manager.lock.RLock() + defer manager.lock.RUnlock() + m := make(map[string]*PeerEventInfo) + for k, p := range manager.peerEventCal { + m[k] = p + } + return m +} + +// 储存第一个发送新Block的Peer数据 +func (manager *PeerManager) ReceiveNewBlockMsg(fromPeer string, blockNum uint64, validator string, block *types.Block) { + if validator == "" || len(validator) == 0 { + return + } + if fromPeer == "" || len(fromPeer) == 0 { + return + } + if block == nil { + return + } + if manager.currentBlock != nil { + //同一个BlockNum 只接收最快的 + if manager.currentBlock.NumberU64() == blockNum { + return + } + } + //log.Info("===> handleBlockAnnounces ReceiveNewBlockMsg", "BlockNum", blockNum, "Peer", fromPeer, "Validator", block.Coinbase().String()) + manager.blockInfoLock.Lock() + defer manager.blockInfoLock.Unlock() + if manager.blockInfoRecordLastTime.IsZero() { + manager.blockInfoRecordLastTime = time.Now() + } + //Block为当前最新区块Block + manager.currentBlock = block + + if _, ok := manager.peerBlockCal[validator]; !ok { + manager.peerBlockCal[validator] = make(map[string]int) + manager.peerBlockCal[validator][fromPeer] = 1 + } else { + encodesMap := manager.peerBlockCal[validator] + if _, ok := encodesMap[fromPeer]; !ok { + manager.peerBlockCal[validator][fromPeer] = 1 + } else { + manager.peerBlockCal[validator][fromPeer] = manager.peerBlockCal[validator][fromPeer] + 1 + } + } + + //keyForTime := validator + "-" + fromPeer + //if _, ok := manager.peerBlockTimeCal[keyForTime]; !ok { + // manager.peerBlockTimeCal[keyForTime] = 1 + //} else { + // manager.peerBlockTimeCal[keyForTime] = manager.peerBlockTimeCal[keyForTime] + 1 //} + //var sortedList []string + //for k := range manager.peerBlockTimeCal { + // sortedList = append(sortedList, k) + //} + // + //// 对 Slice 进行排序 + //sort.Slice(sortedList, func(i, j int) bool { + // return manager.peerBlockTimeCal[sortedList[i]] > manager.peerBlockTimeCal[sortedList[j]] + //}) + // + //// 输出排序后的结果 + //index := 0 + //for _, k := range sortedList { + // if index == 3 { + // break + // } + // fmt.Printf("Validator-EncodeID%s: %d\n", k, manager.peerBlockTimeCal[k]) + // index++ + //} + //log.Info("PeerCal Validator", "数量", len(manager.peerBlockCal)) +} + +func (manager *PeerManager) CreateBlockInfoBroadcastScheduler(second int) { + timezone, _ := time.LoadLocation("Asia/Shanghai") + s := gocron.NewScheduler(timezone) + s.Every(second).Seconds().Do(func() { + //定时广播最新Block + if manager.broadcastBlockFunc != nil && manager.currentBlock != nil { + log.Info("🍻🍻🍻 Sylar Log:定时器开始发送Block信息到Peer", "BlockNum", manager.currentBlock.NumberU64()) + manager.broadcastBlockFunc(manager.currentBlock, true) + GetPeerFilter().PutNumber(manager.currentBlock.NumberU64(), manager.currentBlock.Difficulty()) + } + }) + s.StartAsync() +} + +func (manager *PeerManager) RegisterBroadcastBlockHandlerFunc(f func(block *types.Block, propagate bool)) { + manager.broadcastBlockFunc = f +} + +func (manager *PeerManager) FetchValidatorBlockCounterInfo() []*Global.BlockCalCounterInfoModel { + manager.blockInfoLock.Lock() + defer manager.blockInfoLock.Unlock() + //获取每个Validator的最大EncodeID出现次数 + var encodeIDRecorder []*Global.BlockCalCounterInfoModel + for validatorStr, encodeMap := range manager.peerBlockCal { + maxPeerID := "" + maxTimes := 0 + for peerID, appearTimes := range encodeMap { + if appearTimes > maxTimes { + maxPeerID = peerID + maxTimes = appearTimes + } + } + if maxPeerID != "" && maxTimes != 0 { + if _, ok := manager.peersExistAllTimes[maxPeerID]; ok { + encodeIDRecorder = append(encodeIDRecorder, &Global.BlockCalCounterInfoModel{ + Validator: validatorStr, + PeerID: manager.peersExistAllTimes[maxPeerID].PeerID, + Times: maxTimes, + }) + //log.Info("MaxPeerID", "PeerID", maxPeerID, "EncodeID", manager.peersExistAllTimes[maxPeerID].PeerID) + } else { + //log.Info("NotFound MaxPeerID", "PeerID", maxPeerID) + } + } + } + if len(encodeIDRecorder) > 0 { + return encodeIDRecorder + } + return nil +} + +func (manager *PeerManager) ResetBlockCounter() { + manager.blockInfoLock.Lock() + defer manager.blockInfoLock.Unlock() + manager.peerBlockCal = make(map[string]map[string]int) + manager.blockInfoRecordLastTime = time.Now() } -func (p *peerManager) fetchPeerEncodeID(peerID string) (string, error) { - if _, foundedPeer := p.peers[peerID]; !foundedPeer { - return "", errors.New("not found PeerID Info") +func (manager *PeerManager) FetchLastRecordBlockCounterTimeStr() string { + if manager.blockInfoRecordLastTime.IsZero() { + return "" } - return p.peers[peerID], nil + return Global.GetStringTime(manager.blockInfoRecordLastTime) }