Skip to content

Commit

Permalink
Factor message queue and friends into ./msgq
Browse files Browse the repository at this point in the history
  • Loading branch information
Eugene Kim committed Oct 8, 2019
1 parent 4e479e2 commit 39ec270
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 32 deletions.
49 changes: 49 additions & 0 deletions msgq/msgq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package msgq

import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"

"github.com/harmony-one/harmony/node"
)

// MessageHandler is a message handler.
type MessageHandler interface {
HandleMessage(content []byte, sender peer.ID)
}

// MessageQueue is a finite-sized message queue. It can be used as an overrun
// protection mechanism.
type MessageQueue struct {
ch chan node.incomingMessage
}

// NewMessageQueue returns a new message queue of the given size, which must be
// non-negative.
func NewMessageQueue(size int) *MessageQueue {
return &MessageQueue{ch: make(chan node.incomingMessage, size)}
}

// AddMessage enqueues a received message for processing. It returns without
// blocking, and may return a queue overrun error.
func (q *MessageQueue) AddMessage(content []byte, sender peer.ID) error {
select {
case q.ch <- node.incomingMessage{content, sender}:
default:
return ErrRxOverrun
}
return nil
}

// HandleMessages dequeues and dispatches incoming messages using the given
// message handler, until the message queue is closed. This function can be
// spawned as a background goroutine, potentially multiple times for a pool.
func (q *MessageQueue) HandleMessages(h MessageHandler) {
for msg := range q.ch {
h.HandleMessage(msg.content, msg.sender)
}
}

// ErrRxOverrun signals that a receive queue has been overrun.
var ErrRxOverrun = errors.New("rx overrun")

6 changes: 3 additions & 3 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ type Node struct {
host p2p.Host

// Incoming messages to process.
incomingMessages chan incomingMessage
rxQueue *MessageQueue

// Service manager.
serviceManager *service.Manager
Expand Down Expand Up @@ -414,7 +414,7 @@ func (node *Node) getTransactionsForNewBlock(
// StartServer starts a server and process the requests by a handler.
func (node *Node) StartServer() {
for i := 0; i < RxWorkers; i++ {
go node.handleIncomingMessages()
go node.rxQueue.HandleMessages(node)
}

// start the goroutine to receive client message
Expand Down Expand Up @@ -542,7 +542,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
Interface("genesis block header", node.Blockchain().GetHeaderByNumber(0)).
Msg("Genesis block hash")

node.incomingMessages = make(chan incomingMessage, RxQueueSize)
node.rxQueue = NewMessageQueue(RxQueueSize)

// Setup initial state of syncing.
node.peerRegistrationRecord = make(map[string]*syncConfig)
Expand Down
37 changes: 8 additions & 29 deletions node/node_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/ethereum/go-ethereum/rlp"
pb "github.com/golang/protobuf/proto"
"github.com/harmony-one/bls/ffi/go/bls"
libp2p_peer "github.com/libp2p/go-libp2p-core/peer"

"github.com/harmony-one/harmony/api/proto"
proto_discovery "github.com/harmony-one/harmony/api/proto/discovery"
"github.com/harmony-one/harmony/api/proto/message"
Expand All @@ -28,8 +30,6 @@ import (
"github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/shard"
staking "github.com/harmony-one/harmony/staking/types"
libp2p_peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
)

const (
Expand All @@ -53,58 +53,37 @@ func (node *Node) receiveGroupMessage(receiver p2p.GroupReceiver) {
}
//utils.Logger().Info("[PUBSUB]", "received group msg", len(msg), "sender", sender)
// skip the first 5 bytes, 1 byte is p2p type, 4 bytes are message size
if err := node.enqueueIncomingMessage(msg[5:], sender); err != nil {
if err := node.rxQueue.AddMessage(msg[5:], sender); err != nil {
utils.Logger().Warn().Err(err).
Str("sender", sender.Pretty()).
Msg("cannot enqueue incoming message for processing")
}
}
}

var errRxOverrun = errors.New("rx overrun")

func (node *Node) enqueueIncomingMessage(
content []byte, sender libp2p_peer.ID,
) error {
select {
case node.incomingMessages <- incomingMessage{content, sender}:
default:
return errRxOverrun
}
return nil
}

func (node *Node) handleIncomingMessages() {
for {
// TODO ek – infinite loop; add shutdown/cleanup logic
msg := <-node.incomingMessages
node.handleMessage(msg.content, msg.sender)
}
}

// handleMessage parses the message and dispatch the actions
func (node *Node) handleMessage(content []byte, sender libp2p_peer.ID) {
// HandleMessage parses the message and dispatch the actions.
func (node *Node) HandleMessage(content []byte, sender libp2p_peer.ID) {
msgCategory, err := proto.GetMessageCategory(content)
if err != nil {
utils.Logger().Error().
Err(err).
Msg("handleMessage get message category failed")
Msg("HandleMessage get message category failed")
return
}

msgType, err := proto.GetMessageType(content)
if err != nil {
utils.Logger().Error().
Err(err).
Msg("handleMessage get message type failed")
Msg("HandleMessage get message type failed")
return
}

msgPayload, err := proto.GetMessagePayload(content)
if err != nil {
utils.Logger().Error().
Err(err).
Msg("handleMessage get message payload failed")
Msg("HandleMessage get message payload failed")
return
}

Expand Down

0 comments on commit 39ec270

Please sign in to comment.