Skip to content

Commit

Permalink
separate timer queue ack manager in separate file, add functionality …
Browse files Browse the repository at this point in the history
…to timer queue processor to be cluster aware

rename SequenceID -> TimerSequenceID
  • Loading branch information
Wenquan Xing committed Mar 24, 2018
1 parent bdeb6d4 commit 1e70506
Show file tree
Hide file tree
Showing 21 changed files with 1,429 additions and 423 deletions.
5 changes: 5 additions & 0 deletions common/cluster/metadataTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ const (
var (
// TestAllClusterNames is the all cluster names used for test
TestAllClusterNames = []string{TestCurrentClusterName, TestAlternativeClusterName}
// TestAllClusterNamesMap is the same as above, juse convinent for test mocking
TestAllClusterNamesMap = map[string]bool{
TestCurrentClusterName: true,
TestAlternativeClusterName: true,
}
)

// GetTestClusterMetadata return an cluster metadata instance, which is initialized
Expand Down
6 changes: 4 additions & 2 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,8 @@ const (
ScheduleToStartTimeoutCounter
StartToCloseTimeoutCounter
ScheduleToCloseTimeoutCounter
NewTimerCounter
NewActiveTimerCounter
NewStandbyTimerCounter
NewTimerNotifyCounter
AcquireShardsCounter
AcquireShardsLatency
Expand Down Expand Up @@ -707,7 +708,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ScheduleToStartTimeoutCounter: {metricName: "schedule-to-start-timeout", metricType: Counter},
StartToCloseTimeoutCounter: {metricName: "start-to-close-timeout", metricType: Counter},
ScheduleToCloseTimeoutCounter: {metricName: "schedule-to-close-timeout", metricType: Counter},
NewTimerCounter: {metricName: "new-timer", metricType: Counter},
NewActiveTimerCounter: {metricName: "new-active-timer", metricType: Counter},
NewStandbyTimerCounter: {metricName: "new-standby-timer", metricType: Counter},
NewTimerNotifyCounter: {metricName: "new-timer-notifications", metricType: Counter},
AcquireShardsCounter: {metricName: "acquire-shards-count", metricType: Counter},
AcquireShardsLatency: {metricName: "acquire-shards-latency", metricType: Timer},
Expand Down
97 changes: 97 additions & 0 deletions service/history/MockTimerQueueAckMgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"time"

"github.com/stretchr/testify/mock"
"github.com/uber/cadence/common/persistence"
)

// MockTimerQueueAckMgr is used as mock implementation for TimerQueueAckMgr
type MockTimerQueueAckMgr struct {
mock.Mock
}

// readTimerTasks is mock implementation for readTimerTasks of TimerQueueAckMgr
func (_m *MockTimerQueueAckMgr) readTimerTasks(clusterName string) ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error) {
ret := _m.Called(clusterName)

var r0 []*persistence.TimerTaskInfo
if rf, ok := ret.Get(0).(func(string) []*persistence.TimerTaskInfo); ok {
r0 = rf(clusterName)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*persistence.TimerTaskInfo)
}
}

var r1 *persistence.TimerTaskInfo
if rf, ok := ret.Get(1).(func(string) *persistence.TimerTaskInfo); ok {
r1 = rf(clusterName)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*persistence.TimerTaskInfo)
}
}

var r2 bool
if rf, ok := ret.Get(2).(func(string) bool); ok {
r2 = rf(clusterName)
} else {
r2 = ret.Get(2).(bool)
}

var r3 error
if rf, ok := ret.Get(3).(func(string) error); ok {
r3 = rf(clusterName)
} else {
r3 = ret.Error(3)
}

return r0, r1, r2, r3
}

func (_m *MockTimerQueueAckMgr) retryTimerTask(timerTask *persistence.TimerTaskInfo) {
_m.Called(timerTask)
}

func (_m *MockTimerQueueAckMgr) completeTimerTask(taskID TimerSequenceID) {
_m.Called(taskID)
}

func (_m *MockTimerQueueAckMgr) updateAckLevel(clusterName string) {
_m.Called(clusterName)
}

func (_m *MockTimerQueueAckMgr) isProcessNow(expiryTime time.Time) bool {
ret := _m.Called(expiryTime)

var r0 bool
if rf, ok := ret.Get(0).(func(time.Time) bool); ok {
r0 = rf(expiryTime)
} else {
r0 = ret.Get(0).(bool)
}

return r0
}
3 changes: 1 addition & 2 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ func (h *Handler) Stop() {

// CreateEngine is implementation for HistoryEngineFactory used for creating the engine instance for shard
func (h *Handler) CreateEngine(context ShardContext) Engine {
return NewEngineWithShardContext(context, context.GetDomainCache(), h.visibilityMgr,
h.matchingServiceClient, h.historyServiceClient, h.historyEventNotifier)
return NewEngineWithShardContext(context, h.visibilityMgr, h.matchingServiceClient, h.historyServiceClient, h.historyEventNotifier)
}

// Health is for health check
Expand Down
18 changes: 7 additions & 11 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
hc "github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/logging"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
Expand All @@ -59,7 +58,6 @@ type (
tokenSerializer common.TaskTokenSerializer
hSerializerFactory persistence.HistorySerializerFactory
historyCache *historyCache
domainCache cache.DomainCache
metricsClient metrics.Client
logger bark.Logger
}
Expand Down Expand Up @@ -105,9 +103,8 @@ var (
)

// NewEngineWithShardContext creates an instance of history engine
func NewEngineWithShardContext(shard ShardContext, domainCache cache.DomainCache,
visibilityMgr persistence.VisibilityManager, matching matching.Client, historyClient hc.Client,
historyEventNotifier historyEventNotifier) Engine {
func NewEngineWithShardContext(shard ShardContext, visibilityMgr persistence.VisibilityManager,
matching matching.Client, historyClient hc.Client, historyEventNotifier historyEventNotifier) Engine {
shardWrapper := &shardContextWrapper{
ShardContext: shard,
historyEventNotifier: historyEventNotifier,
Expand All @@ -124,7 +121,6 @@ func NewEngineWithShardContext(shard ShardContext, domainCache cache.DomainCache
tokenSerializer: common.NewJSONTaskTokenSerializer(),
hSerializerFactory: persistence.NewHistorySerializerFactory(),
historyCache: historyCache,
domainCache: domainCache,
logger: logger.WithFields(bark.Fields{
logging.TagWorkflowComponent: logging.TagValueHistoryEngineComponent,
}),
Expand Down Expand Up @@ -848,7 +844,7 @@ Update_History_Loop:
// First check if we need to use a different target domain to schedule activity
if attributes.Domain != nil {
// TODO: Error handling for ActivitySchedule failed when domain lookup fails
domainEntry, err := e.domainCache.GetDomain(*attributes.Domain)
domainEntry, err := e.shard.GetDomainCache().GetDomain(*attributes.Domain)
if err != nil {
return &workflow.InternalServiceError{Message: "Unable to schedule activity across domain."}
}
Expand Down Expand Up @@ -1027,7 +1023,7 @@ Update_History_Loop:
if attributes.GetDomain() == "" {
foreignDomainID = msBuilder.executionInfo.DomainID
} else {
foreignDomainEntry, err := e.domainCache.GetDomain(attributes.GetDomain())
foreignDomainEntry, err := e.shard.GetDomainCache().GetDomain(attributes.GetDomain())
if err != nil {
return &workflow.InternalServiceError{
Message: fmt.Sprintf("Unable to cancel workflow across domain: %v.", attributes.GetDomain())}
Expand Down Expand Up @@ -1065,7 +1061,7 @@ Update_History_Loop:
if attributes.GetDomain() == "" {
foreignDomainID = msBuilder.executionInfo.DomainID
} else {
foreignDomainEntry, err := e.domainCache.GetDomain(attributes.GetDomain())
foreignDomainEntry, err := e.shard.GetDomainCache().GetDomain(attributes.GetDomain())
if err != nil {
return &workflow.InternalServiceError{
Message: fmt.Sprintf("Unable to signal workflow across domain: %v.", attributes.GetDomain())}
Expand Down Expand Up @@ -1140,7 +1136,7 @@ Update_History_Loop:
// First check if we need to use a different target domain to schedule child execution
if attributes.Domain != nil {
// TODO: Error handling for DecisionType_StartChildWorkflowExecution failed when domain lookup fails
domainEntry, err := e.domainCache.GetDomain(*attributes.Domain)
domainEntry, err := e.shard.GetDomainCache().GetDomain(*attributes.Domain)
if err != nil {
return &workflow.InternalServiceError{Message: "Unable to schedule child execution across domain."}
}
Expand Down Expand Up @@ -1817,7 +1813,7 @@ func (e *historyEngineImpl) getDeleteWorkflowTasks(

// Generate a timer task to cleanup history events for this workflow execution
var retentionInDays int32
domainEntry, err := e.domainCache.GetDomainByID(domainID)
domainEntry, err := e.shard.GetDomainCache().GetDomainByID(domainID)
if err != nil {
if _, ok := err.(*workflow.EntityNotExistsError); !ok {
return nil, nil, err
Expand Down
6 changes: 5 additions & 1 deletion service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/mocks"
Expand Down Expand Up @@ -134,12 +135,15 @@ func (s *engine2Suite) SetupTest() {
executionManager: s.mockExecutionMgr,
historyMgr: s.mockHistoryMgr,
historyCache: historyCache,
domainCache: domainCache,
logger: s.logger,
metricsClient: metrics.NewClient(tally.NoopScope, metrics.History),
tokenSerializer: common.NewJSONTaskTokenSerializer(),
hSerializerFactory: persistence.NewHistorySerializerFactory(),
}

// setup the basics of cluster metadata, since during the initialization of * queue processor, those cluster metadata will be used
s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName)
s.mockClusterMetadata.On("GetAllClusterNames").Return(cluster.TestAllClusterNamesMap)
h.txProcessor = newTransferQueueProcessor(mockShard, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient)
h.timerProcessor = newTimerQueueProcessor(mockShard, h, s.mockExecutionMgr, s.logger)
s.historyEngine = h
Expand Down
8 changes: 8 additions & 0 deletions service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ type (
NotifyNewTimers(timerTask []persistence.Task)
}

timerQueueAckMgr interface {
readTimerTasks(clusterName string) ([]*persistence.TimerTaskInfo, *persistence.TimerTaskInfo, bool, error)
retryTimerTask(timerTask *persistence.TimerTaskInfo)
completeTimerTask(taskID TimerSequenceID)
updateAckLevel(clusterName string)
isProcessNow(time.Time) bool
}

historyEventNotifier interface {
common.Daemon
NotifyNewHistoryEvent(event *historyEventNotification)
Expand Down
7 changes: 5 additions & 2 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *engineSuite) SetupTest() {
return len(workflowID)
},
)
domainCache := cache.NewDomainCache(s.mockMetadataMgr, cluster.GetTestClusterMetadata(false, false), s.logger)
domainCache := cache.NewDomainCache(s.mockMetadataMgr, s.mockClusterMetadata, s.logger)
mockShard := &shardContextImpl{
service: s.mockService,
shardInfo: &persistence.ShardInfo{ShardID: shardID, RangeID: 1, TransferAckLevel: 0},
Expand All @@ -145,13 +145,16 @@ func (s *engineSuite) SetupTest() {
executionManager: s.mockExecutionMgr,
historyMgr: s.mockHistoryMgr,
historyCache: historyCache,
domainCache: domainCache,
logger: s.logger,
metricsClient: metrics.NewClient(tally.NoopScope, metrics.History),
tokenSerializer: common.NewJSONTaskTokenSerializer(),
hSerializerFactory: persistence.NewHistorySerializerFactory(),
historyEventNotifier: historyEventNotifier,
}

// setup the basics of cluster metadata, since during the initialization of * queue processor, those cluster metadata will be used
s.mockClusterMetadata.On("GetCurrentClusterName").Return(cluster.TestCurrentClusterName)
s.mockClusterMetadata.On("GetAllClusterNames").Return(cluster.TestAllClusterNamesMap)
h.txProcessor = newTransferQueueProcessor(shardContextWrapper, h, s.mockVisibilityMgr, s.mockMatchingClient, s.mockHistoryClient)
h.timerProcessor = newTimerQueueProcessor(shardContextWrapper, h, s.mockExecutionMgr, s.logger)
h.historyEventNotifier.Start()
Expand Down
Loading

0 comments on commit 1e70506

Please sign in to comment.