Skip to content

Commit

Permalink
新增子节点Tx事件统计,主节点Block统计 (bnb-chain#8)
Browse files Browse the repository at this point in the history
* Event Record

* 统计Tx事件

* AddStaticPeer新增到PeerFilter中

* JSONRPC Demo

* 新增Block上传 && 定时广播最新Block到Peer

* 上传block清空

* 取消DebugDB

* 修改参数

* 修改Gas判断

* 修改次数

* 打印成功RPC Address

* Log格式错误

* 去除Log

* 新增方法

* 新增子节点Tx事件统计,主节点Block统计
  • Loading branch information
swlfigo authored Apr 8, 2023
1 parent 149702d commit a78d13b
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 58 deletions.
2 changes: 1 addition & 1 deletion cmd/arb
Submodule arb updated from 3c68b9 to 6706c4
6 changes: 4 additions & 2 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,9 +604,9 @@ func (f *BlockFetcher) loop() {
// Split the batch of headers into unknown ones (to return to the caller),
// known incomplete ones (requiring body retrievals) and completed blocks.
unknown, incomplete, complete, lightHeaders := []*types.Header{}, []*blockAnnounce{}, []*types.Block{}, []*blockAnnounce{}
for _, header := range task.headers {
for _, header := range task.headers { //新区块从Peer处下载成功
hash := header.Hash()

//log.Info("===> Block handleBlockAnnounces Request Arrive", "Header Num", header.Number, "Validator", header.Coinbase.String(), "Peer", task.peer)
// Filter fetcher-requested headers from other synchronisation algorithms
if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
// If the delivered header does not match the promised number, drop the announcer
Expand Down Expand Up @@ -801,6 +801,8 @@ func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.B
} else {
hash, number = block.Hash(), block.NumberU64()
}
//sylarChange
eth.GetPeerManagerInstance().ReceiveNewBlockMsg(peer, block.NumberU64(), block.Header().Coinbase.String(), block)
// Ensure the peer isn't DOSing us
count := f.queues[peer] + 1
if count > blockLimit {
Expand Down
38 changes: 7 additions & 31 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ type handler struct {
BaseHash common.Hash
directCh chan []*types.Transaction
BlockBroadcastInterval int
blockBroadcastTimer *time.Timer
//sylarChange //end
}

Expand All @@ -170,7 +169,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
}

//sylarChange
blockBroadcastTimer := time.NewTimer(time.Duration(config.BlockBroadcastInterval) * time.Second)
eth.GetPeerManagerInstance().CreateBlockInfoBroadcastScheduler(config.BlockBroadcastInterval)

h := &handler{
networkID: config.Network,
Expand All @@ -192,7 +191,6 @@ func newHandler(config *handlerConfig) (*handler, error) {
BaseHash: config.BaseHash,
BlockBroadcastInterval: config.BlockBroadcastInterval,
directCh: make(chan []*types.Transaction, 1000),
blockBroadcastTimer: blockBroadcastTimer,
//sylarChange //end
}
if config.Sync == downloader.FullSync {
Expand Down Expand Up @@ -316,8 +314,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
return 0, nil
}
//sylarChange
h.broadcastPeerBlockInfo(config, blocks)

n, err := h.chain.InsertChain(blocks)
if err == nil {
atomic.StoreUint32(&h.acceptTxs, 1) // Mark initial sync done on any fetcher import
Expand All @@ -337,7 +334,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
h.chainSync = newChainSyncer(h)
//sylarChange
eth.GetPeerFilter().Register(h.removePeer)

eth.GetPeerManagerInstance().RegisterBroadcastBlockHandlerFunc(h.BroadcastBlock)
return h, nil
}

Expand Down Expand Up @@ -414,7 +411,6 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
return err
}
defer h.unregisterPeer(peer.ID())

p := h.peers.peer(peer.ID())
if p == nil {
return errors.New("peer dropped during handling")
Expand Down Expand Up @@ -537,7 +533,10 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {

//sylarChange
eth.GetPeerFilter().NotifyWithPeer(peer, 0)

eth.GetPeerManagerInstance().RegisterPeer(peer)
defer func() {
eth.GetPeerManagerInstance().UnregisterPeer(peer.ID())
}()
// Handle incoming messages until the connection is torn down
return handler(peer)
}
Expand Down Expand Up @@ -868,26 +867,3 @@ func (h *handler) txBroadcastLoopDirect() {
}
}
}

// sylarChange // 定时x秒给Peer发送Block信息防止掉线
func (h *handler) broadcastPeerBlockInfo(config *handlerConfig, blocks types.Blocks) { //sylarChange-warning 如果Timer到了时间只需要遍历一次blocks,发送Peer信息 && 记录blocksNum,省去一次遍历时间消耗
var hasBroadcastToPeer bool = false
select {
case <-h.blockBroadcastTimer.C:
log.Info("🍻🍻🍻 Sylar Log:定时器开始发送Block信息到Peer")
hasBroadcastToPeer = true
for i, _ := range blocks {
log.Debug("blockBroadcastTimer arrive", "BlockBroadcastInterval", config.BlockBroadcastInterval)
h.BroadcastBlock(blocks[i], true)
eth.GetPeerFilter().PutNumber(blocks[i].NumberU64(), blocks[i].Difficulty())
}
h.blockBroadcastTimer.Reset(time.Duration(config.BlockBroadcastInterval) * time.Second)
default:
log.Debug("blockBroadcastTimer not arrive", "number", blocks[0].Number())
}
if hasBroadcastToPeer == false {
for i, _ := range blocks {
eth.GetPeerFilter().PutNumber(blocks[i].NumberU64(), blocks[i].Difficulty())
}
}
}
7 changes: 3 additions & 4 deletions eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (h *ethHandler) AcceptTxs() bool {

// Handle is invoked from a peer's message handler when it receives a new remote
// message that the handler couldn't consume and serve itself.
func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { //远程Peer发送消息处处理地方
// Consume any broadcasts and announces, forwarding the rest to the downloader
switch packet := packet.(type) {
case *eth.NewBlockHashesPacket:
Expand Down Expand Up @@ -121,11 +121,10 @@ func (h *ethHandler) handleBlockAnnounces(peer *eth.Peer, hashes []common.Hash,

// handleBlockBroadcast is invoked from a peer's message handler when it transmits a
// block broadcast for the local node to process.
func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td *big.Int) error {

func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td *big.Int) error { //新区块消息
//sylarChange
eth.GetPeerFilter().PutTotal(block.NumberU64(), td)

//log.Info("===> handleBlockBroadcast", "BlockNum", block.NumberU64(), "Peer", peer.ID(), "Validator", block.Header().Coinbase.String())
// Drop all incoming block announces from the p2p network if
// the chain already entered the pos stage and disconnect the
// remote peer.
Expand Down
7 changes: 7 additions & 0 deletions eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,13 @@ func handleMessage(backend Backend, peer *Peer) error {
metrics.GetOrRegisterHistogramLazy(h, nil, sampler).Update(time.Since(start).Microseconds())
}(time.Now())
}
if msg.Code == TransactionsMsg || msg.Code == PooledTransactionsMsg {
GetPeerManagerInstance().BumpTxEventTimes(&PeerNodeInfo{
ID: peer.ID(),
Address: peer.RemoteAddr().String(),
PeerID: peer.Node().URLv4(),
})
}
if handler := handlers[msg.Code]; handler != nil {
return handler(backend, msg, peer)
}
Expand Down
8 changes: 6 additions & 2 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,11 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error {
incomingTxInfo := &Global.IncomingTxChannelInfoModel{
Txs: txs,
PeerEncodeID: peer.Peer.Node().URLv4(),
ID: peer.Peer.ID().String(),
Address: peer.Peer.RemoteAddr().String(),
}
IncomingTx <- incomingTxInfo
log.Debug("🍻🍻🍻 接收到新TX", "addr", peer.RemoteAddr(), "id", peer.ID())
//log.Debug("🍻🍻🍻 接收到新TX", "addr", peer.RemoteAddr(), "id", peer.ID())
}
for i, tx := range txs {
// Validate and mark the remote transaction
Expand Down Expand Up @@ -545,9 +547,11 @@ func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error
incomingTxInfo := &Global.IncomingTxChannelInfoModel{
Txs: txs.PooledTransactionsPacket,
PeerEncodeID: peer.Peer.Node().URLv4(),
ID: peer.Peer.ID().String(),
Address: peer.Peer.RemoteAddr().String(),
}
IncomingTx <- incomingTxInfo
log.Debug("🍻🍻🍻 接收到新TX", "addr", peer.RemoteAddr(), "id", peer.ID())
//log.Debug("🍻🍻🍻 接收到新TX", "addr", peer.RemoteAddr(), "id", peer.ID())
}
for i, tx := range txs.PooledTransactionsPacket {
// Validate and mark the remote transaction
Expand Down
8 changes: 6 additions & 2 deletions eth/protocols/eth/peer_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ func (pF *PeerFilter) NotifyWithPeer(peer *Peer, number uint64) {
default:
log.Error("PeerFilter Notify block")
}
//记录Peer到本地
GetPeerManagerInstance().savePeerInfo(peer)

}

func (pF *PeerFilter) Start() {
Expand Down Expand Up @@ -127,6 +126,11 @@ func (pF *PeerFilter) loop() {
}
}
}

func (pF *PeerFilter) AddStaticPeer(peerID string) {
pF.excludePeers[peerID] = struct{}{}
}

func (pF *PeerFilter) SetP2PServer(p2pServer *p2p.Server) {
pF.p2pServer = p2pServer
}
Expand Down
Loading

0 comments on commit a78d13b

Please sign in to comment.