Skip to content

Commit

Permalink
(fixup) Ajust RecvAsync with EnvelopeReceiver
Browse files Browse the repository at this point in the history
  • Loading branch information
tnasu committed Jul 21, 2023
1 parent 0dcc7ef commit b7aa174
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
10 changes: 9 additions & 1 deletion p2p/base_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,15 @@ func (br *BaseReactor) RecvRoutine() {
for {
select {
case msg := <-br.recvMsgBuf:
br.impl.Receive(msg.ChID, msg.Peer, msg.Msg)
if nr, ok := br.impl.(EnvelopeReceiver); ok {
nr.ReceiveEnvelope(Envelope{
ChannelID: msg.ChID,
Src: msg.Peer,
Message: msg.Message,
})
} else {
br.impl.Receive(msg.ChID, msg.Peer, msg.Msg)
}
case <-br.Quit():
return
}
Expand Down
16 changes: 7 additions & 9 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ type Peer interface {
}

type BufferedMsg struct {
ChID byte
Peer Peer
Msg []byte
ChID byte
Peer Peer
Msg []byte
Message proto.Message
}

type EnvelopeSender interface {
Expand Down Expand Up @@ -539,7 +540,7 @@ func createMConnection(
}
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
<<<<<<< HEAD
p.metrics.MessageReceiveBytesTotal.With("message_type", p.mlc.ValueToMetricLabel(msg)).Add(float64(len(msgBytes)))
if config.RecvAsync {
ch := reactor.GetRecvChan()
p.metrics.NumPooledPeerMsgs.With(labels...).Set(float64(len(ch)))
Expand All @@ -548,16 +549,13 @@ func createMConnection(
copied := make([]byte, len(msgBytes))
copy(copied, msgBytes)
// if the channel is full, we are blocking a message until it can send into the channel
ch <- &BufferedMsg{ChID: chID, Peer: p, Msg: copied}
=======
p.metrics.MessageReceiveBytesTotal.With("message_type", p.mlc.ValueToMetricLabel(msg)).Add(float64(len(msgBytes)))
if nr, ok := reactor.(EnvelopeReceiver); ok {
ch <- &BufferedMsg{ChID: chID, Peer: p, Msg: copied, Message: msg}
} else if nr, ok := reactor.(EnvelopeReceiver); ok {
nr.ReceiveEnvelope(Envelope{
ChannelID: chID,
Src: p,
Message: msg,
})
>>>>>>> bdedf2ec2 (p2p: add a per-message type send and receive metric (backport #9622) (#9641))
} else {
reactor.Receive(chID, p, msgBytes)
}
Expand Down

0 comments on commit b7aa174

Please sign in to comment.