diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index 3e10b6835..833d7984c 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -112,7 +112,15 @@ func (br *BaseReactor) RecvRoutine() { for { select { case msg := <-br.recvMsgBuf: - br.impl.Receive(msg.ChID, msg.Peer, msg.Msg) + if nr, ok := br.impl.(EnvelopeReceiver); ok { + nr.ReceiveEnvelope(Envelope{ + ChannelID: msg.ChID, + Src: msg.Peer, + Message: msg.Message, + }) + } else { + br.impl.Receive(msg.ChID, msg.Peer, msg.Msg) + } case <-br.Quit(): return } diff --git a/p2p/peer.go b/p2p/peer.go index f48183683..ee3301be7 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -53,9 +53,10 @@ type Peer interface { } type BufferedMsg struct { - ChID byte - Peer Peer - Msg []byte + ChID byte + Peer Peer + Msg []byte + Message proto.Message } type EnvelopeSender interface { @@ -539,7 +540,7 @@ func createMConnection( } } p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes))) -<<<<<<< HEAD + p.metrics.MessageReceiveBytesTotal.With("message_type", p.mlc.ValueToMetricLabel(msg)).Add(float64(len(msgBytes))) if config.RecvAsync { ch := reactor.GetRecvChan() p.metrics.NumPooledPeerMsgs.With(labels...).Set(float64(len(ch))) @@ -548,16 +549,13 @@ func createMConnection( copied := make([]byte, len(msgBytes)) copy(copied, msgBytes) // if the channel is full, we are blocking a message until it can send into the channel - ch <- &BufferedMsg{ChID: chID, Peer: p, Msg: copied} -======= - p.metrics.MessageReceiveBytesTotal.With("message_type", p.mlc.ValueToMetricLabel(msg)).Add(float64(len(msgBytes))) - if nr, ok := reactor.(EnvelopeReceiver); ok { + ch <- &BufferedMsg{ChID: chID, Peer: p, Msg: copied, Message: msg} + } else if nr, ok := reactor.(EnvelopeReceiver); ok { nr.ReceiveEnvelope(Envelope{ ChannelID: chID, Src: p, Message: msg, }) ->>>>>>> bdedf2ec2 (p2p: add a per-message type send and receive metric (backport #9622) (#9641)) } else { reactor.Receive(chID, p, msgBytes) }