Skip to content
This repository has been archived by the owner on Nov 7, 2023. It is now read-only.

Commit

Permalink
Merge pull request #208 from sergefdrv/req-view-change-handling
Browse files Browse the repository at this point in the history
ReqViewChange message handling
  • Loading branch information
Sergey Fedorov authored Jun 28, 2021
2 parents ce22fdb + e4f73d6 commit e320139
Show file tree
Hide file tree
Showing 4 changed files with 386 additions and 7 deletions.
18 changes: 13 additions & 5 deletions core/message-handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ func defaultMessageHandlers(id uint32, log messagelog.MessageLog, unicastLogs ma
validateRequest := makeRequestValidator(verifyMessageSignature)
validatePrepare := makePrepareValidator(n, verifyUI, validateRequest)
validateCommit := makeCommitValidator(verifyUI, validatePrepare)
validateMessage := makeMessageValidator(validateRequest, validatePrepare, validateCommit)
validateReqViewChange := makeReqViewChangeValidator(verifyMessageSignature)
validateMessage := makeMessageValidator(validateRequest, validatePrepare, validateCommit, validateReqViewChange)

applyCommit := makeCommitApplier(collectCommitment)
applyPrepare := makePrepareApplier(id, prepareSeq, collectCommitment, handleGeneratedMessage, stopPrepTimer)
Expand All @@ -187,7 +188,12 @@ func defaultMessageHandlers(id uint32, log messagelog.MessageLog, unicastLogs ma
processViewMessage := makeViewMessageProcessor(viewState, applyPeerMessage)
processUIMessage := makeUIMessageProcessor(captureUI, processViewMessage)
processEmbedded := makeEmbeddedMessageProcessor(processMessageThunk, logger)
processPeerMessage := makePeerMessageProcessor(processEmbedded, processUIMessage)

collectReqViewChange := makeReqViewChangeCollector(f)
startViewChange := makeViewChangeStarter(id, viewState, log, handleGeneratedMessage)
processReqViewChange := makeReqViewChangeProcessor(collectReqViewChange, startViewChange)

processPeerMessage := makePeerMessageProcessor(processEmbedded, processUIMessage, processReqViewChange)
processMessage = makeMessageProcessor(processRequest, processPeerMessage)
handleOwnMessage = makeOwnMessageHandler(processMessage)
handlePeerMessage = makePeerMessageHandler(validateMessage, processMessage)
Expand Down Expand Up @@ -405,7 +411,7 @@ func makeClientMessageHandler(validateRequest requestValidator, processRequest r

// makeMessageValidator constructs an instance of messageValidator
// using the supplied abstractions.
func makeMessageValidator(validateRequest requestValidator, validatePrepare prepareValidator, validateCommit commitValidator) messageValidator {
func makeMessageValidator(validateRequest requestValidator, validatePrepare prepareValidator, validateCommit commitValidator, validateReqViewChange reqViewChangeValidator) messageValidator {
return func(msg messages.Message) error {
switch msg := msg.(type) {
case messages.Request:
Expand All @@ -415,7 +421,7 @@ func makeMessageValidator(validateRequest requestValidator, validatePrepare prep
case messages.Commit:
return validateCommit(msg)
case messages.ReqViewChange:
return fmt.Errorf("not implemented")
return validateReqViewChange(msg)
default:
panic("Unknown message type")
}
Expand All @@ -437,13 +443,15 @@ func makeMessageProcessor(processRequest requestProcessor, processPeerMessage pe
}
}

func makePeerMessageProcessor(processEmbedded embeddedMessageProcessor, processUIMessage uiMessageProcessor) peerMessageProcessor {
func makePeerMessageProcessor(processEmbedded embeddedMessageProcessor, processUIMessage uiMessageProcessor, processReqViewChange reqViewChangeProcessor) peerMessageProcessor {
return func(msg messages.PeerMessage) (new bool, err error) {
processEmbedded(msg)

switch msg := msg.(type) {
case messages.CertifiedMessage:
return processUIMessage(msg)
case messages.ReqViewChange:
return processReqViewChange(msg)
default:
panic("Unknown message type")
}
Expand Down
42 changes: 40 additions & 2 deletions core/message-handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,16 @@ func TestMakeMessageValidator(t *testing.T) {
args := mock.MethodCalled("commitValidator", msg)
return args.Error(0)
}
validateMessage := makeMessageValidator(validateRequest, validatePrepare, validateCommit)
validateReqViewChange := func(msg messages.ReqViewChange) error {
args := mock.MethodCalled("reqViewChangeValidator", msg)
return args.Error(0)
}
validateMessage := makeMessageValidator(validateRequest, validatePrepare, validateCommit, validateReqViewChange)

request := messageImpl.NewRequest(0, rand.Uint64(), nil)
prepare := messageImpl.NewPrepare(0, 0, request)
commit := messageImpl.NewCommit(0, prepare)
rvc := messageImpl.NewReqViewChange(0, rand.Uint64())

t.Run("UnknownMessageType", func(t *testing.T) {
msg := mock_messages.NewMockMessage(ctrl)
Expand Down Expand Up @@ -235,6 +240,15 @@ func TestMakeMessageValidator(t *testing.T) {
err = validateMessage(commit)
assert.NoError(t, err)
})
t.Run("ReqViewChange", func(t *testing.T) {
mock.On("reqViewChangeValidator", rvc).Return(fmt.Errorf("Error")).Once()
err := validateMessage(rvc)
assert.Error(t, err, "Invalid ReqViewChange")

mock.On("reqViewChangeValidator", rvc).Return(nil).Once()
err = validateMessage(rvc)
assert.NoError(t, err)
})
}

func TestMakeMessageProcessor(t *testing.T) {
Expand Down Expand Up @@ -311,7 +325,11 @@ func TestMakePeerMessageProcessor(t *testing.T) {
args := mock.MethodCalled("uiMessageProcessor", msg)
return args.Bool(0), args.Error(1)
}
process := makePeerMessageProcessor(processEmbedded, processUIMessage)
processReqViewChange := func(msg messages.ReqViewChange) (new bool, err error) {
args := mock.MethodCalled("reqViewChangeProcessor", msg)
return args.Bool(0), args.Error(1)
}
process := makePeerMessageProcessor(processEmbedded, processUIMessage, processReqViewChange)

t.Run("UnknownMessageType", func(t *testing.T) {
msg := mock_messages.NewMockPeerMessage(ctrl)
Expand Down Expand Up @@ -350,6 +368,26 @@ func TestMakePeerMessageProcessor(t *testing.T) {
assert.False(t, new)

})
t.Run("ReqViewChange", func(t *testing.T) {
msg := messageImpl.NewReqViewChange(rand.Uint32(), rand.Uint64())

mock.On("embeddedMessageProcessor", msg).Once()
mock.On("reqViewChangeProcessor", msg).Return(false, fmt.Errorf("Error")).Once()
_, err := process(msg)
assert.Error(t, err, "Failed to finish processing certified message")

mock.On("embeddedMessageProcessor", msg).Once()
mock.On("reqViewChangeProcessor", msg).Return(true, nil).Once()
new, err := process(msg)
assert.NoError(t, err)
assert.True(t, new)

mock.On("embeddedMessageProcessor", msg).Once()
mock.On("reqViewChangeProcessor", msg).Return(false, nil).Once()
new, err = process(msg)
assert.NoError(t, err)
assert.False(t, new)
})
}

func TestMakeEmbeddedMessageProcessor(t *testing.T) {
Expand Down
138 changes: 138 additions & 0 deletions core/req-view-change.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) 2021 NEC Laboratories Europe GmbH.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package minbft

import (
"fmt"
"sync"

"github.com/hyperledger-labs/minbft/core/internal/messagelog"
"github.com/hyperledger-labs/minbft/core/internal/viewstate"
"github.com/hyperledger-labs/minbft/messages"
)

// reqViewChangeValidator validates a ReqViewChangeMessage.
//
// It authenticates and checks the supplied message for internal
// consistency. It does not use replica's current state and has no
// side-effect. It is safe to invoke concurrently.
type reqViewChangeValidator func(rvc messages.ReqViewChange) error

// reqViewChangeProcessor processes a valid ReqViewChange message.
//
// It continues processing of the supplied message. The return value
// new indicates if the message had any effect. It is safe to invoke
// concurrently.
type reqViewChangeProcessor func(rvc messages.ReqViewChange) (new bool, err error)

// reqViewChangeCollector collects view change requests.
//
// The supplied ReqViewChange message is assumed to be valid. Once the
// threshold of matching ReqViewChange messages from distinct replicas
// referring to the next view has been reached, it returns a
// view-change certificate comprised of those messages. The return
// value new indicates if the message had any effect.
type reqViewChangeCollector func(rvc messages.ReqViewChange) (new bool, _ messages.ViewChangeCert)

// viewChangeStarter attempts to start view change.
//
// It proceeds to trigger view change with the supplied expected new
// view number justified by the supplied view-change certificate
// unless the replica cannot transition to that view anymore.
type viewChangeStarter func(newView uint64, vcCert messages.ViewChangeCert) (ok bool, err error)

func makeReqViewChangeValidator(verifySignature messageSignatureVerifier) reqViewChangeValidator {
return func(rvc messages.ReqViewChange) error {
if rvc.NewView() < 1 {
return fmt.Errorf("Invalid new view number")
}

if err := verifySignature(rvc); err != nil {
return fmt.Errorf("Signature is not valid: %s", err)
}

return nil
}
}

func makeReqViewChangeProcessor(collect reqViewChangeCollector, startViewChange viewChangeStarter) reqViewChangeProcessor {
var lock sync.Mutex

return func(rvc messages.ReqViewChange) (new bool, err error) {
lock.Lock()
defer lock.Unlock()

new, vcCert := collect(rvc)
if vcCert == nil {
return new, nil
}

return startViewChange(rvc.NewView(), vcCert)
}
}

func makeReqViewChangeCollector(f uint32) reqViewChangeCollector {
var (
view uint64
collected = make(messages.ViewChangeCert, 0, f+1)
replicas = make(map[uint32]bool, f+1)
)

return func(rvc messages.ReqViewChange) (new bool, vcCert messages.ViewChangeCert) {
replicaID := rvc.ReplicaID()

if rvc.NewView() != view+1 || replicas[replicaID] {
return false, nil
}

collected = append(collected, rvc)
replicas[replicaID] = true

if uint32(len(collected)) <= f {
return true, nil
}

vcCert = collected
collected = make(messages.ViewChangeCert, 0, f+1)
replicas = make(map[uint32]bool, f+1)
view++

return true, vcCert
}
}

func makeViewChangeStarter(id uint32, viewState viewstate.State, log messagelog.MessageLog, handleGeneratedMessage generatedMessageHandler) viewChangeStarter {
return func(newView uint64, vcCert messages.ViewChangeCert) (ok bool, err error) {
ok, release := viewState.AdvanceExpectedView(newView)
if !ok {
return false, nil
}
defer release()

var msgs messages.MessageLog
for _, m := range log.Messages() {
if m, ok := m.(messages.CertifiedMessage); ok {
msgs = append(msgs, m)
}
}
log.Reset(nil)

// TODO: start view-change timer

handleGeneratedMessage(messageImpl.NewViewChange(id, newView, msgs, vcCert))

return true, nil
}
}
Loading

0 comments on commit e320139

Please sign in to comment.