diff --git a/internal/bft/controller.go b/internal/bft/controller.go index 1a3a5306..315e46e3 100644 --- a/internal/bft/controller.go +++ b/internal/bft/controller.go @@ -213,11 +213,25 @@ func (c *Controller) OnAutoRemoveTimeout(requestInfo types.RequestInfo) { // ProcessMessages dispatches the incoming message to the required component func (c *Controller) ProcessMessages(sender uint64, m *protos.Message) { - if IsViewMessage(m) { + switch m.GetContent().(type) { + case *protos.Message_PrePrepare, *protos.Message_Prepare, *protos.Message_Commit: c.currView.HandleMessage(sender, m) + c.Logger.Debugf("Node %d handled message %v from %d with seq %d", c.ID, m, sender, proposalSequence(m)) + + case *protos.Message_ViewChange, *protos.Message_ViewData, *protos.Message_NewView: + // TODO view change + c.Logger.Debugf("View change not yet implemented, ignoring message: %v, from %d", m, sender) + + case *protos.Message_HeartBeat: + //TODO heartbeat monitor + c.Logger.Debugf("Heartbeat monitor not yet implemented, ignoring message: %v, from %d", m, sender) + + case *protos.Message_Error: + c.Logger.Debugf("Error message handling not yet implemented, ignoring message: %v, from %d", m, sender) + + default: + c.Logger.Warnf("Unexpected message type, ignoring") } - c.Logger.Debugf("Node %d handled message %v from %d with seq %d", c.ID, m, sender, proposalSequence(m)) - // TODO the msg can be a view change message or a tx req coming from a node after a timeout } func (c *Controller) startView(proposalSequence uint64) { diff --git a/internal/bft/heartbeatmonitor.go b/internal/bft/heartbeatmonitor.go new file mode 100644 index 00000000..077ec884 --- /dev/null +++ b/internal/bft/heartbeatmonitor.go @@ -0,0 +1,80 @@ +// Copyright IBM Corp. All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +package bft + +import ( + "sync" + "time" + + "github.com/SmartBFT-Go/consensus/pkg/api" + "github.com/SmartBFT-Go/consensus/smartbftprotos" +) + +const ( + DefaultHeartbeatTimeout = 60 * time.Second +) + +//go:generate mockery -dir . -name HeartbeatTimeoutHandler -case underscore -output ./mocks/ + +// HeartbeatTimeoutHandler defines who to call when a heartbeat timeout expires. +type HeartbeatTimeoutHandler interface { + OnHeartbeatTimeout(view uint64, leaderID uint64) +} + +type HeartbeatMonitor struct { + logger api.Logger + hbTimeout time.Duration + hbInterval time.Duration + comm Comm + handler HeartbeatTimeoutHandler + mutex sync.Mutex + timer *time.Timer + view uint64 + leaderID uint64 + follower bool +} + +func NewHeartbeatMonitor( + logger api.Logger, + heartbeatTimeout time.Duration, + comm Comm, +) *HeartbeatMonitor { + if heartbeatTimeout/10 < time.Nanosecond { + return nil + } + + hm := &HeartbeatMonitor{ + logger: logger, + hbTimeout: heartbeatTimeout, + hbInterval: heartbeatTimeout / 10, + comm: comm, + } + return hm +} + +func (hm *HeartbeatMonitor) SetTimeoutHandler(handler HeartbeatTimeoutHandler) { + // TODO +} + +// StartFollower will start following the heartbeats of the leader of the view. +func (hm *HeartbeatMonitor) StartFollower(view uint64, leaderID uint64) { + // TODO +} + +// StartLeader will start sending heartbeats to all followers. +func (hm *HeartbeatMonitor) StartLeader(view uint64, leaderID uint64) { + // TODO +} + +// ProcessMsg handles an incoming heartbeat. +func (hm *HeartbeatMonitor) ProcessMsg(sender uint64, msg *smartbftprotos.HeartBeat) { + // TODO +} + +// Close stops following or sending heartbeats. +func (hm *HeartbeatMonitor) Close() { + // TODO +} diff --git a/internal/bft/heartbeatmonitor_test.go b/internal/bft/heartbeatmonitor_test.go new file mode 100644 index 00000000..9f4362fe --- /dev/null +++ b/internal/bft/heartbeatmonitor_test.go @@ -0,0 +1,30 @@ +// Copyright IBM Corp. All Rights Reserved. +// +// SPDX-License-Identifier: Apache-2.0 +// + +package bft_test + +import ( + "testing" + "time" + + "github.com/SmartBFT-Go/consensus/internal/bft" + "github.com/SmartBFT-Go/consensus/internal/bft/mocks" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func TestHeartbeatMonitor_New(t *testing.T) { + basicLog, err := zap.NewDevelopment() + assert.NoError(t, err) + log := basicLog.Sugar() + + comm := &mocks.CommMock{} + handler := &mocks.HeartbeatTimeoutHandler{} + + hm := bft.NewHeartbeatMonitor(log, time.Hour, comm) + assert.NotNil(t, hm) + hm.SetTimeoutHandler(handler) + hm.Close() +} diff --git a/internal/bft/mocks/heartbeat_timeout_handler.go b/internal/bft/mocks/heartbeat_timeout_handler.go new file mode 100644 index 00000000..4de03f54 --- /dev/null +++ b/internal/bft/mocks/heartbeat_timeout_handler.go @@ -0,0 +1,15 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// HeartbeatTimeoutHandler is an autogenerated mock type for the HeartbeatTimeoutHandler type +type HeartbeatTimeoutHandler struct { + mock.Mock +} + +// OnHeartbeatTimeout provides a mock function with given fields: view, leaderID +func (_m *HeartbeatTimeoutHandler) OnHeartbeatTimeout(view uint64, leaderID uint64) { + _m.Called(view, leaderID) +} diff --git a/internal/bft/util.go b/internal/bft/util.go index ad216069..d22612cf 100644 --- a/internal/bft/util.go +++ b/internal/bft/util.go @@ -13,10 +13,6 @@ import ( "github.com/golang/protobuf/proto" ) -func IsViewMessage(m *protos.Message) bool { - return m.GetCommit() != nil || m.GetPrepare() != nil || m.GetPrePrepare() != nil -} - func viewNumber(m *protos.Message) uint64 { if pp := m.GetPrePrepare(); pp != nil { return pp.GetView() diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 4aa66d26..00f87bcd 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -98,10 +98,7 @@ func (c *Consensus) Stop() { } func (c *Consensus) HandleMessage(sender uint64, m *protos.Message) { - if algorithm.IsViewMessage(m) { - c.controller.ProcessMessages(sender, m) - } - + c.controller.ProcessMessages(sender, m) } func (c *Consensus) HandleRequest(sender uint64, req []byte) {