Skip to content

Commit

Permalink
Merge pull request #222 from tock-ibm/hb-dispatch
Browse files Browse the repository at this point in the history
Heartbeat dispatch & skeleton
  • Loading branch information
tock-ibm authored Aug 5, 2019
2 parents d36bf21 + 0e24a7d commit b94942b
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 11 deletions.
20 changes: 17 additions & 3 deletions internal/bft/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,25 @@ func (c *Controller) OnAutoRemoveTimeout(requestInfo types.RequestInfo) {

// ProcessMessages dispatches the incoming message to the required component
func (c *Controller) ProcessMessages(sender uint64, m *protos.Message) {
if IsViewMessage(m) {
switch m.GetContent().(type) {
case *protos.Message_PrePrepare, *protos.Message_Prepare, *protos.Message_Commit:
c.currView.HandleMessage(sender, m)
c.Logger.Debugf("Node %d handled message %v from %d with seq %d", c.ID, m, sender, proposalSequence(m))

case *protos.Message_ViewChange, *protos.Message_ViewData, *protos.Message_NewView:
// TODO view change
c.Logger.Debugf("View change not yet implemented, ignoring message: %v, from %d", m, sender)

case *protos.Message_HeartBeat:
//TODO heartbeat monitor
c.Logger.Debugf("Heartbeat monitor not yet implemented, ignoring message: %v, from %d", m, sender)

case *protos.Message_Error:
c.Logger.Debugf("Error message handling not yet implemented, ignoring message: %v, from %d", m, sender)

default:
c.Logger.Warnf("Unexpected message type, ignoring")
}
c.Logger.Debugf("Node %d handled message %v from %d with seq %d", c.ID, m, sender, proposalSequence(m))
// TODO the msg can be a view change message or a tx req coming from a node after a timeout
}

func (c *Controller) startView(proposalSequence uint64) {
Expand Down
80 changes: 80 additions & 0 deletions internal/bft/heartbeatmonitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright IBM Corp. All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

package bft

import (
"sync"
"time"

"github.com/SmartBFT-Go/consensus/pkg/api"
"github.com/SmartBFT-Go/consensus/smartbftprotos"
)

const (
DefaultHeartbeatTimeout = 60 * time.Second
)

//go:generate mockery -dir . -name HeartbeatTimeoutHandler -case underscore -output ./mocks/

// HeartbeatTimeoutHandler defines who to call when a heartbeat timeout expires.
type HeartbeatTimeoutHandler interface {
OnHeartbeatTimeout(view uint64, leaderID uint64)
}

type HeartbeatMonitor struct {
logger api.Logger
hbTimeout time.Duration
hbInterval time.Duration
comm Comm
handler HeartbeatTimeoutHandler
mutex sync.Mutex
timer *time.Timer
view uint64
leaderID uint64
follower bool
}

func NewHeartbeatMonitor(
logger api.Logger,
heartbeatTimeout time.Duration,
comm Comm,
) *HeartbeatMonitor {
if heartbeatTimeout/10 < time.Nanosecond {
return nil
}

hm := &HeartbeatMonitor{
logger: logger,
hbTimeout: heartbeatTimeout,
hbInterval: heartbeatTimeout / 10,
comm: comm,
}
return hm
}

func (hm *HeartbeatMonitor) SetTimeoutHandler(handler HeartbeatTimeoutHandler) {
// TODO
}

// StartFollower will start following the heartbeats of the leader of the view.
func (hm *HeartbeatMonitor) StartFollower(view uint64, leaderID uint64) {
// TODO
}

// StartLeader will start sending heartbeats to all followers.
func (hm *HeartbeatMonitor) StartLeader(view uint64, leaderID uint64) {
// TODO
}

// ProcessMsg handles an incoming heartbeat.
func (hm *HeartbeatMonitor) ProcessMsg(sender uint64, msg *smartbftprotos.HeartBeat) {
// TODO
}

// Close stops following or sending heartbeats.
func (hm *HeartbeatMonitor) Close() {
// TODO
}
30 changes: 30 additions & 0 deletions internal/bft/heartbeatmonitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright IBM Corp. All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

package bft_test

import (
"testing"
"time"

"github.com/SmartBFT-Go/consensus/internal/bft"
"github.com/SmartBFT-Go/consensus/internal/bft/mocks"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

func TestHeartbeatMonitor_New(t *testing.T) {
basicLog, err := zap.NewDevelopment()
assert.NoError(t, err)
log := basicLog.Sugar()

comm := &mocks.CommMock{}
handler := &mocks.HeartbeatTimeoutHandler{}

hm := bft.NewHeartbeatMonitor(log, time.Hour, comm)
assert.NotNil(t, hm)
hm.SetTimeoutHandler(handler)
hm.Close()
}
15 changes: 15 additions & 0 deletions internal/bft/mocks/heartbeat_timeout_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions internal/bft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ import (
"github.com/golang/protobuf/proto"
)

func IsViewMessage(m *protos.Message) bool {
return m.GetCommit() != nil || m.GetPrepare() != nil || m.GetPrePrepare() != nil
}

func viewNumber(m *protos.Message) uint64 {
if pp := m.GetPrePrepare(); pp != nil {
return pp.GetView()
Expand Down
5 changes: 1 addition & 4 deletions pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,7 @@ func (c *Consensus) Stop() {
}

func (c *Consensus) HandleMessage(sender uint64, m *protos.Message) {
if algorithm.IsViewMessage(m) {
c.controller.ProcessMessages(sender, m)
}

c.controller.ProcessMessages(sender, m)
}

func (c *Consensus) HandleRequest(sender uint64, req []byte) {
Expand Down

0 comments on commit b94942b

Please sign in to comment.