diff --git a/api/api.go b/api/api.go index 599f8c26..49129163 100644 --- a/api/api.go +++ b/api/api.go @@ -42,8 +42,13 @@ type Configer interface { // L: must be larger than CheckpointPeriod Logsize() uint32 - // starts when receives a request and stops when request is accepted + // starts when receives a prepare message and stops when request + // is accepted TimeoutRequest() time.Duration + + // starts when receives a request and stops when request is prepared + TimeoutPrepare() time.Duration + // starts when sends VIEW-CHANGE and stops when receives a valid NEW-VIEW TimeoutViewChange() time.Duration } diff --git a/api/mocks/mock.go b/api/mocks/mock.go index 5d5605c6..3980a971 100644 --- a/api/mocks/mock.go +++ b/api/mocks/mock.go @@ -90,6 +90,20 @@ func (mr *MockConfigerMockRecorder) N() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "N", reflect.TypeOf((*MockConfiger)(nil).N)) } +// TimeoutPrepare mocks base method +func (m *MockConfiger) TimeoutPrepare() time.Duration { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TimeoutPrepare") + ret0, _ := ret[0].(time.Duration) + return ret0 +} + +// TimeoutPrepare indicates an expected call of TimeoutPrepare +func (mr *MockConfigerMockRecorder) TimeoutPrepare() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TimeoutPrepare", reflect.TypeOf((*MockConfiger)(nil).TimeoutPrepare)) +} + // TimeoutRequest mocks base method func (m *MockConfiger) TimeoutRequest() time.Duration { m.ctrl.T.Helper() diff --git a/core/internal/clientstate/client-state.go b/core/internal/clientstate/client-state.go index 2394aaaf..43d56205 100644 --- a/core/internal/clientstate/client-state.go +++ b/core/internal/clientstate/client-state.go @@ -93,6 +93,13 @@ func NewProvider(opts ...Option) Provider { // has not yet expired, it will be canceled and a new timer started. // // StopRequestTimer stops timer started by StartRequestTimer, if any. +// +// StartPrepareTimer starts a timer to expire after the duration of +// prepare timeout. The supplied callback function handleTimeout is +// invoked asynchronously upon timer expiration. If the previous timer +// has not yet expired, it will be canceled and a new timer started. +// +// StopPrepareTimer stops timer started by StartPrepareTimer, if any. type State interface { CaptureRequestSeq(seq uint64) (new bool, release func()) PrepareRequestSeq(seq uint64) (new bool, err error) @@ -103,6 +110,9 @@ type State interface { StartRequestTimer(handleTimeout func()) StopRequestTimer() + + StartPrepareTimer(handleTimeout func()) + StopPrepareTimer() } // New creates a new instance of client state representation. Optional @@ -117,6 +127,7 @@ func New(opts ...Option) State { s.seqState = newSeqState() s.replyState = newReplyState() s.requestTimerState = newRequestTimeoutState(&s.opts) + s.prepareTimerState = newPrepareTimeoutState(&s.opts) return s } @@ -127,11 +138,13 @@ type Option func(*options) type options struct { timerProvider timer.Provider requestTimeout func() time.Duration + prepareTimeout func() time.Duration } var defaultOptions = options{ timerProvider: timer.Standard(), requestTimeout: func() time.Duration { return time.Duration(0) }, + prepareTimeout: func() time.Duration { return time.Duration(0) }, } // WithTimerProvider specifies the abstract timer implementation to @@ -151,10 +164,20 @@ func WithRequestTimeout(timeout func() time.Duration) Option { } } +// WithPrepareTimeout specifies a function that returns the duration +// to use when starting a new prepare timeout timer. Zero or negative +// duration disables the timeout. The timeout is disabled by default. +func WithPrepareTimeout(timeout func() time.Duration) Option { + return func(opts *options) { + opts.prepareTimeout = timeout + } +} + type clientState struct { *seqState *replyState *requestTimerState + *prepareTimerState opts options } diff --git a/core/internal/clientstate/mocks/mock.go b/core/internal/clientstate/mocks/mock.go index 1c2e3448..17ad6797 100644 --- a/core/internal/clientstate/mocks/mock.go +++ b/core/internal/clientstate/mocks/mock.go @@ -35,6 +35,7 @@ func (m *MockState) EXPECT() *MockStateMockRecorder { // AddReply mocks base method func (m *MockState) AddReply(arg0 *messages.Reply) error { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AddReply", arg0) ret0, _ := ret[0].(error) return ret0 @@ -42,11 +43,13 @@ func (m *MockState) AddReply(arg0 *messages.Reply) error { // AddReply indicates an expected call of AddReply func (mr *MockStateMockRecorder) AddReply(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddReply", reflect.TypeOf((*MockState)(nil).AddReply), arg0) } // CaptureRequestSeq mocks base method func (m *MockState) CaptureRequestSeq(arg0 uint64) (bool, func()) { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CaptureRequestSeq", arg0) ret0, _ := ret[0].(bool) ret1, _ := ret[1].(func()) @@ -55,11 +58,13 @@ func (m *MockState) CaptureRequestSeq(arg0 uint64) (bool, func()) { // CaptureRequestSeq indicates an expected call of CaptureRequestSeq func (mr *MockStateMockRecorder) CaptureRequestSeq(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CaptureRequestSeq", reflect.TypeOf((*MockState)(nil).CaptureRequestSeq), arg0) } // PrepareRequestSeq mocks base method func (m *MockState) PrepareRequestSeq(arg0 uint64) (bool, error) { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PrepareRequestSeq", arg0) ret0, _ := ret[0].(bool) ret1, _ := ret[1].(error) @@ -68,11 +73,13 @@ func (m *MockState) PrepareRequestSeq(arg0 uint64) (bool, error) { // PrepareRequestSeq indicates an expected call of PrepareRequestSeq func (mr *MockStateMockRecorder) PrepareRequestSeq(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrepareRequestSeq", reflect.TypeOf((*MockState)(nil).PrepareRequestSeq), arg0) } // ReplyChannel mocks base method func (m *MockState) ReplyChannel(arg0 uint64) <-chan *messages.Reply { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ReplyChannel", arg0) ret0, _ := ret[0].(<-chan *messages.Reply) return ret0 @@ -80,11 +87,13 @@ func (m *MockState) ReplyChannel(arg0 uint64) <-chan *messages.Reply { // ReplyChannel indicates an expected call of ReplyChannel func (mr *MockStateMockRecorder) ReplyChannel(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplyChannel", reflect.TypeOf((*MockState)(nil).ReplyChannel), arg0) } // RetireRequestSeq mocks base method func (m *MockState) RetireRequestSeq(arg0 uint64) (bool, error) { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RetireRequestSeq", arg0) ret0, _ := ret[0].(bool) ret1, _ := ret[1].(error) @@ -93,25 +102,54 @@ func (m *MockState) RetireRequestSeq(arg0 uint64) (bool, error) { // RetireRequestSeq indicates an expected call of RetireRequestSeq func (mr *MockStateMockRecorder) RetireRequestSeq(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RetireRequestSeq", reflect.TypeOf((*MockState)(nil).RetireRequestSeq), arg0) } +// StartPrepareTimer mocks base method +func (m *MockState) StartPrepareTimer(arg0 func()) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "StartPrepareTimer", arg0) +} + +// StartPrepareTimer indicates an expected call of StartPrepareTimer +func (mr *MockStateMockRecorder) StartPrepareTimer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartPrepareTimer", reflect.TypeOf((*MockState)(nil).StartPrepareTimer), arg0) +} + // StartRequestTimer mocks base method func (m *MockState) StartRequestTimer(arg0 func()) { + m.ctrl.T.Helper() m.ctrl.Call(m, "StartRequestTimer", arg0) } // StartRequestTimer indicates an expected call of StartRequestTimer func (mr *MockStateMockRecorder) StartRequestTimer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartRequestTimer", reflect.TypeOf((*MockState)(nil).StartRequestTimer), arg0) } +// StopPrepareTimer mocks base method +func (m *MockState) StopPrepareTimer() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "StopPrepareTimer") +} + +// StopPrepareTimer indicates an expected call of StopPrepareTimer +func (mr *MockStateMockRecorder) StopPrepareTimer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StopPrepareTimer", reflect.TypeOf((*MockState)(nil).StopPrepareTimer)) +} + // StopRequestTimer mocks base method func (m *MockState) StopRequestTimer() { + m.ctrl.T.Helper() m.ctrl.Call(m, "StopRequestTimer") } // StopRequestTimer indicates an expected call of StopRequestTimer func (mr *MockStateMockRecorder) StopRequestTimer() *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StopRequestTimer", reflect.TypeOf((*MockState)(nil).StopRequestTimer)) } diff --git a/core/internal/clientstate/prepare-timeout.go b/core/internal/clientstate/prepare-timeout.go new file mode 100644 index 00000000..d4ede1f7 --- /dev/null +++ b/core/internal/clientstate/prepare-timeout.go @@ -0,0 +1,63 @@ +// Copyright (c) 2019 NEC Solution Innovators, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientstate + +import ( + "fmt" + "sync" + "time" + + "github.com/hyperledger-labs/minbft/core/internal/timer" +) + +type prepareTimerState struct { + sync.Mutex + + prepareTimer timer.Timer + + opts *options +} + +func newPrepareTimeoutState(opts *options) *prepareTimerState { + return &prepareTimerState{opts: opts} +} + +func (s *prepareTimerState) StartPrepareTimer(forward func()) { + s.Lock() + defer s.Unlock() + + timerProvider := s.opts.timerProvider + timeout := s.opts.prepareTimeout() + + if s.prepareTimer != nil { + s.prepareTimer.Stop() + } + + if timeout <= time.Duration(0) { + return + } + + fmt.Printf("start forward timer: timeout = %d\n", timeout) + s.prepareTimer = timerProvider.AfterFunc(timeout, forward) +} + +func (s *prepareTimerState) StopPrepareTimer() { + s.Lock() + defer s.Unlock() + + if s.prepareTimer != nil { + s.prepareTimer.Stop() + } +} diff --git a/core/internal/clientstate/prepare-timeout_test.go b/core/internal/clientstate/prepare-timeout_test.go new file mode 100644 index 00000000..72ef8c48 --- /dev/null +++ b/core/internal/clientstate/prepare-timeout_test.go @@ -0,0 +1,117 @@ +// Copyright (c) 2019 NEC Solution Innovators, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientstate + +import ( + "testing" + "time" + + "github.com/golang/mock/gomock" + + "github.com/stretchr/testify/assert" + testifymock "github.com/stretchr/testify/mock" + + "github.com/hyperledger-labs/minbft/core/internal/timer" + + timermock "github.com/hyperledger-labs/minbft/core/internal/timer/mock" +) + +func TestPrepareTimeout(t *testing.T) { + t.Run("Start", testStartPrepareTimeout) + t.Run("Stop", testStopPrepareTimeout) +} + +func testStartPrepareTimeout(t *testing.T) { + mock := new(testifymock.Mock) + defer mock.AssertExpectations(t) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + s, timerProvider, handleTimeout := setupPrepareTimeoutMock(mock, ctrl) + + // Start with disabled prepare timeout + mock.On("prepareTimeout").Return(time.Duration(0)).Once() + s.StartPrepareTimer(handleTimeout) + + // Start with enabled prepare timeout + timeout := randTimeout() + mock.On("prepareTimeout").Return(timeout).Once() + mockTimer := timermock.NewMockTimer(ctrl) + timerProvider.EXPECT().AfterFunc(timeout, gomock.Any()).DoAndReturn( + func(d time.Duration, f func()) timer.Timer { + f() + return mockTimer + }, + ) + mock.On("prepareTimeoutHandler").Once() + s.StartPrepareTimer(handleTimeout) + + // Restart prepare timeout + mockTimer.EXPECT().Stop() + timeout = randTimeout() + mock.On("prepareTimeout").Return(timeout).Once() + mockTimer = timermock.NewMockTimer(ctrl) + timerProvider.EXPECT().AfterFunc(timeout, gomock.Any()).DoAndReturn( + func(d time.Duration, f func()) timer.Timer { + f() + return mockTimer + }, + ) + mock.On("prepareTimeoutHandler").Once() + s.StartPrepareTimer(handleTimeout) +} + +func testStopPrepareTimeout(t *testing.T) { + mock := new(testifymock.Mock) + defer mock.AssertExpectations(t) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + s, timerProvider, handleTimeout := setupPrepareTimeoutMock(mock, ctrl) + + timeout := randTimeout() + mock.On("prepareTimeout").Return(timeout) + + // Stop before started + assert.NotPanics(t, func() { + s.StopPrepareTimer() + }) + + // Start and stop + mockTimer := timermock.NewMockTimer(ctrl) + timerProvider.EXPECT().AfterFunc(timeout, gomock.Any()).Return(mockTimer) + s.StartPrepareTimer(handleTimeout) + mockTimer.EXPECT().Stop() + s.StopPrepareTimer() + + // Stop again + mockTimer.EXPECT().Stop().AnyTimes() + s.StopPrepareTimer() +} + +func setupPrepareTimeoutMock(mock *testifymock.Mock, ctrl *gomock.Controller) (state State, timerProvider *timermock.MockProvider, handleTimeout func()) { + handleTimeout = func() { + mock.MethodCalled("prepareTimeoutHandler") + } + prepareTimeout := func() time.Duration { + args := mock.MethodCalled("prepareTimeout") + return args.Get(0).(time.Duration) + } + timerProvider = timermock.NewMockProvider(ctrl) + state = New(WithPrepareTimeout(prepareTimeout), WithTimerProvider(timerProvider)) + return state, timerProvider, handleTimeout +} diff --git a/core/message-handling.go b/core/message-handling.go index d2ae4de7..4b456228 100644 --- a/core/message-handling.go +++ b/core/message-handling.go @@ -158,6 +158,7 @@ func defaultIncomingMessageHandler(id uint32, log messagelog.MessageLog, config handleReqTimeout := func(view uint64) { logger.Panic("Request timed out, but view change not implemented") } + prepTimeout := makePrepareTimeoutProvider(config) verifyMessageSignature := makeMessageSignatureVerifier(stack) signMessage := makeReplicaMessageSigner(stack) @@ -166,6 +167,7 @@ func defaultIncomingMessageHandler(id uint32, log messagelog.MessageLog, config clientStateOpts := []clientstate.Option{ clientstate.WithRequestTimeout(reqTimeout), + clientstate.WithPrepareTimeout(prepTimeout), } clientStates := clientstate.NewProvider(clientStateOpts...) peerStates := peerstate.NewProvider() @@ -206,10 +208,13 @@ func defaultIncomingMessageHandler(id uint32, log messagelog.MessageLog, config validateCommit := makeCommitValidator(verifyUI, validatePrepare) validateMessage := makeMessageValidator(validateRequest, validatePrepare, validateCommit) + startPrepTimer := makePrepareTimerStarter(clientStates, consumeGeneratedMessage, logger) + stopPrepTimer := makePrepareTimerStopper(clientStates) + applyCommit := makeCommitApplier(collectCommitment) - applyPrepare := makePrepareApplier(id, prepareSeq, collectCommitment, handleGeneratedUIMessage) + applyPrepare := makePrepareApplier(id, prepareSeq, collectCommitment, handleGeneratedUIMessage, stopPrepTimer, startReqTimer) applyReplicaMessage = makeReplicaMessageApplier(applyPrepare, applyCommit) - applyRequest := makeRequestApplier(id, n, provideView, handleGeneratedUIMessage, startReqTimer) + applyRequest := makeRequestApplier(id, n, provideView, handleGeneratedUIMessage, startPrepTimer) var processMessage messageProcessor @@ -538,7 +543,7 @@ func makeGeneratedMessageConsumer(log messagelog.MessageLog, provider clientstat // Erroneous Reply must never be supplied panic(fmt.Errorf("Failed to consume generated Reply: %s", err)) } - case *messages.Prepare, *messages.Commit: + case *messages.Request, *messages.Prepare, *messages.Commit: log.Append(messages.WrapMessage(msg)) default: panic("Unknown message type") diff --git a/core/prepare.go b/core/prepare.go index c41b9a54..4d7f03b6 100644 --- a/core/prepare.go +++ b/core/prepare.go @@ -64,12 +64,17 @@ func makePrepareValidator(n uint32, verifyUI uiVerifier, validateRequest request // makePrepareApplier constructs an instance of prepareApplier using // id as the current replica ID, and the supplied abstract interfaces. -func makePrepareApplier(id uint32, prepareSeq requestSeqPreparer, collectCommitment commitmentCollector, handleGeneratedUIMessage generatedUIMessageHandler) prepareApplier { +func makePrepareApplier(id uint32, prepareSeq requestSeqPreparer, collectCommitment commitmentCollector, handleGeneratedUIMessage generatedUIMessageHandler, stopPrepTimer prepareTimerStopper, startReqTimer requestTimerStarter) prepareApplier { return func(prepare *messages.Prepare) error { - if new := prepareSeq(prepare.Msg.Request); !new { + request := prepare.Msg.Request + if new := prepareSeq(request); !new { return fmt.Errorf("Request already prepared") } + // TODO: we'll need view check here when view-change is implemented. + stopPrepTimer(prepare.Msg.ReplicaId) + startReqTimer(request.Msg.ClientId, prepare.Msg.View) + primaryID := prepare.ReplicaID() if err := collectCommitment(primaryID, prepare); err != nil { @@ -85,7 +90,7 @@ func makePrepareApplier(id uint32, prepareSeq requestSeqPreparer, collectCommitm View: prepare.Msg.View, ReplicaId: id, PrimaryId: primaryID, - Request: prepare.Msg.Request, + Request: request, PrimaryUi: prepare.UIBytes(), }, } diff --git a/core/prepare_test.go b/core/prepare_test.go index 0abd9dc5..2cad600e 100644 --- a/core/prepare_test.go +++ b/core/prepare_test.go @@ -103,16 +103,25 @@ func TestMakePrepareApplier(t *testing.T) { handleGeneratedUIMessage := func(msg messages.MessageWithUI) { mock.MethodCalled("generatedUIMessageHandler", msg) } - apply := makePrepareApplier(id, prepareRequestSeq, collectCommitment, handleGeneratedUIMessage) + stopPrepTimer := func(id uint32) { + mock.MethodCalled("prepareTimerStopper", id) + } + startReqTimer := func(clientID uint32, view uint64) { + mock.MethodCalled("requestTimerStarter", clientID, view) + } + apply := makePrepareApplier(id, prepareRequestSeq, collectCommitment, handleGeneratedUIMessage, stopPrepTimer, startReqTimer) + clientID := rand.Uint32() + vfp := viewForPrimary(n, id) request := &messages.Request{ Msg: &messages.Request_M{ + ClientId: clientID, Seq: rand.Uint64(), }, } ownPrepare := &messages.Prepare{ Msg: &messages.Prepare_M{ - View: viewForPrimary(n, id), + View: vfp, ReplicaId: id, Request: request, }, @@ -142,21 +151,29 @@ func TestMakePrepareApplier(t *testing.T) { assert.Error(t, err, "Request ID already prepared") mock.On("requestSeqPreparer", request).Return(true).Once() + mock.On("prepareTimerStopper", id).Once() + mock.On("requestTimerStarter", clientID, vfp).Once() mock.On("commitmentCollector", id, ownPrepare).Return(fmt.Errorf("Error")).Once() err = apply(ownPrepare) assert.Error(t, err, "Failed to collect commitment") mock.On("requestSeqPreparer", request).Return(true).Once() + mock.On("prepareTimerStopper", id).Once() + mock.On("requestTimerStarter", clientID, vfp).Once() mock.On("commitmentCollector", id, ownPrepare).Return(nil).Once() err = apply(ownPrepare) assert.NoError(t, err) mock.On("requestSeqPreparer", request).Return(true).Once() + mock.On("prepareTimerStopper", primary).Once() + mock.On("requestTimerStarter", clientID, view).Once() mock.On("commitmentCollector", primary, prepare).Return(fmt.Errorf("Error")).Once() err = apply(prepare) assert.Error(t, err, "Failed to collect commitment") mock.On("requestSeqPreparer", request).Return(true).Once() + mock.On("prepareTimerStopper", primary).Once() + mock.On("requestTimerStarter", clientID, view).Once() mock.On("commitmentCollector", primary, prepare).Return(nil).Once() mock.On("generatedUIMessageHandler", commit).Once() err = apply(prepare) diff --git a/core/request.go b/core/request.go index df9ec1f3..5735e77f 100644 --- a/core/request.go +++ b/core/request.go @@ -124,6 +124,22 @@ type requestTimeoutHandler func(view uint64) // requestTimeoutProvider returns current request timeout duration. type requestTimeoutProvider func() time.Duration +// prepareTimerStarter starts prepare timer. +// +// A prepare timeout event is triggered if the prepare timeout elapses +// before corresponding prepareTimerStopper is called with the same +// replicaID passed. The argument view specifies the view derived from +// the request message. It is allowed to restart a timer before the previous +// corresponding timer has stopped or expired. It is safe to invoke +// concurrently. +type prepareTimerStarter func(request *messages.Request, view uint64) + +// prepareTimerStopper stops prepare timer. +type prepareTimerStopper func(replicaID uint32) + +// prepareTimeoutProvider returns current prepare timeout duration. +type prepareTimeoutProvider func() time.Duration + // makeRequestValidator constructs an instance of requestValidator // using the supplied abstractions. func makeRequestValidator(verify messageSignatureVerifier) requestValidator { @@ -151,18 +167,12 @@ func makeRequestProcessor(captureSeq requestSeqCapturer, applyRequest requestApp } } -func makeRequestApplier(id, n uint32, provideView viewProvider, handleGeneratedUIMessage generatedUIMessageHandler, startReqTimer requestTimerStarter) requestApplier { +func makeRequestApplier(id, n uint32, provideView viewProvider, handleGeneratedUIMessage generatedUIMessageHandler, startPrepTimer prepareTimerStarter) requestApplier { return func(request *messages.Request) error { view, releaseView := provideView() defer releaseView() - // The primary has to start request timer, as well. - // Suppose, the primary is correct, but its messages - // are delayed, and other replicas switch to a new - // view. In that case, other replicas might rely on - // this correct replica to trigger another view - // change, should the new primary be faulty. - startReqTimer(request.Msg.ClientId, view) + startPrepTimer(request, view) if isPrimary(view, id, n) { prepare := &messages.Prepare{ @@ -305,3 +315,32 @@ func makeRequestTimeoutProvider(config api.Configer) requestTimeoutProvider { return config.TimeoutRequest() } } + +// makePrepareTimerStarter constructs an instance of +// prepareTimerStarter. +func makePrepareTimerStarter(provideClientState clientstate.Provider, consume generatedMessageConsumer, logger *logging.Logger) prepareTimerStarter { + return func(request *messages.Request, view uint64) { + clientID := request.Msg.ClientId + + provideClientState(clientID).StartPrepareTimer(func() { + logger.Infof("Prepare timer expired: client=%d view=%d", clientID, view) + consume(request) + }) + } +} + +// makePrepareTimerStopper constructs an instance of +// prepareTimerStopper. +func makePrepareTimerStopper(provideClientState clientstate.Provider) prepareTimerStopper { + return func(clientID uint32) { + provideClientState(clientID).StopPrepareTimer() + } +} + +// makePrepareTimeoutProvider constructs an instance of +// prepareTimeoutProvider. +func makePrepareTimeoutProvider(config api.Configer) prepareTimeoutProvider { + return func() time.Duration { + return config.TimeoutPrepare() + } +} diff --git a/core/request_test.go b/core/request_test.go index 4d138bc1..093adac4 100644 --- a/core/request_test.go +++ b/core/request_test.go @@ -95,10 +95,10 @@ func TestMakeRequestApplier(t *testing.T) { handleGeneratedUIMessage := func(msg messages.MessageWithUI) { mock.MethodCalled("generatedUIMessageHandler", msg) } - startReqTimer := func(clientID uint32, view uint64) { - mock.MethodCalled("requestTimerStarter", clientID, view) + startPrepTimer := func(msg *messages.Request, view uint64) { + mock.MethodCalled("prepareTimerStarter", msg, view) } - apply := makeRequestApplier(id, n, provideView, handleGeneratedUIMessage, startReqTimer) + apply := makeRequestApplier(id, n, provideView, handleGeneratedUIMessage, startPrepTimer) clientID := rand.Uint32() request := &messages.Request{ @@ -116,13 +116,13 @@ func TestMakeRequestApplier(t *testing.T) { } mock.On("viewProvider").Return(otherView).Once() - mock.On("requestTimerStarter", clientID, otherView).Once() + mock.On("prepareTimerStarter", request, otherView).Once() mock.On("viewReleaser", otherView).Once() err := apply(request) assert.NoError(t, err) mock.On("viewProvider").Return(ownView).Once() - mock.On("requestTimerStarter", clientID, ownView).Once() + mock.On("prepareTimerStarter", request, ownView).Once() mock.On("generatedUIMessageHandler", prepare).Once() mock.On("viewReleaser", ownView).Once() err = apply(request) diff --git a/messages/messages.go b/messages/messages.go index bece9d87..6f7116a0 100644 --- a/messages/messages.go +++ b/messages/messages.go @@ -93,6 +93,12 @@ func (m *Request) ClientID() uint32 { return m.GetMsg().GetClientId() } +// ReplicaID returns ID of the replica forwarding the message +func (m *Request) ReplicaID() uint32 { + // TODO: we have to determine the proper behavior. + return 0 +} + // Payload returns serialized message data, except signature func (m *Request) Payload() []byte { mBytes, err := proto.Marshal(m.GetMsg()) @@ -113,6 +119,11 @@ func (m *Request) AttachSignature(signature []byte) { m.Signature = signature } +// EmbeddedMessages attaches a list of messages embedded into this one +func (m *Request) EmbeddedMessages() []interface{} { + return nil +} + // ReplicaID returns ID of the replica created the message func (m *Reply) ReplicaID() uint32 { return m.Msg.GetReplicaId() diff --git a/sample/config/consensus.yaml b/sample/config/consensus.yaml index 639f7f30..ae1cc81e 100644 --- a/sample/config/consensus.yaml +++ b/sample/config/consensus.yaml @@ -19,6 +19,9 @@ protocol: # Request processing timeout (triggers view change) request: 2s + # Prepare processing timeout (forwards request if expired) + prepare: 1s + # Initial view change timeout (triggers another view change) viewchange: 3s diff --git a/sample/config/viperconfiger.go b/sample/config/viperconfiger.go index 6d244be5..9e5402eb 100644 --- a/sample/config/viperconfiger.go +++ b/sample/config/viperconfiger.go @@ -55,6 +55,7 @@ type Peer struct { // // timeout: // request: 2s +// prepare: 1s // viewchange: 3s // peers: // - id: 0 @@ -148,6 +149,11 @@ func (c *ViperConfiger) TimeoutRequest() time.Duration { return c.getTimeDuration("protocol.timeout.request") } +// TimeoutPrepare returns the timeout to forward REQUEST message +func (c *ViperConfiger) TimeoutPrepare() time.Duration { + return c.getTimeDuration("protocol.timeout.prepare") +} + // TimeoutViewChange returns the timeout to receive NEW-VIEW message func (c *ViperConfiger) TimeoutViewChange() time.Duration { return c.getTimeDuration("protocol.timeout.viewchange")