Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made execution.Cache an interface so we can mock it in unit tests #6058

Merged
merged 2 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions service/history/decision/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type (
shard shard.Context
timeSource clock.TimeSource
domainCache cache.DomainCache
executionCache *execution.Cache
executionCache execution.Cache
tokenSerializer common.TaskTokenSerializer
metricsClient metrics.Client
logger log.Logger
Expand All @@ -74,7 +74,7 @@ type (
// NewHandler creates a new Handler for handling decision business logic
func NewHandler(
shard shard.Context,
executionCache *execution.Cache,
executionCache execution.Cache,
tokenSerializer common.TaskTokenSerializer,
) Handler {
config := shard.GetConfig()
Expand Down
2 changes: 1 addition & 1 deletion service/history/decision/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *DecisionHandlerSuite) TestNewHandler() {
shardContext.EXPECT().GetDomainCache().Times(2)
shardContext.EXPECT().GetMetricsClient().Times(2)
shardContext.EXPECT().GetThrottledLogger().Times(1).Return(testlogger.New(s.T()))
h := NewHandler(shardContext, &execution.Cache{}, tokenSerializer)
h := NewHandler(shardContext, execution.NewMockCache(s.controller), tokenSerializer)
s.NotNil(h)
s.Equal("handlerImpl", reflect.ValueOf(h).Elem().Type().Name())
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/engine/engineimpl/history_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type historyEngineImpl struct {
nDCActivityReplicator ndc.ActivityReplicator
historyEventNotifier events.Notifier
tokenSerializer common.TaskTokenSerializer
executionCache *execution.Cache
executionCache execution.Cache
metricsClient metrics.Client
logger log.Logger
throttledLogger log.Logger
Expand All @@ -120,7 +120,7 @@ type historyEngineImpl struct {
wfIDCache workflowcache.WFCache
ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter

updateWithActionFn func(context.Context, *execution.Cache, string, types.WorkflowExecution, bool, time.Time, func(wfContext execution.Context, mutableState execution.MutableState) error) error
updateWithActionFn func(context.Context, execution.Cache, string, types.WorkflowExecution, bool, time.Time, func(wfContext execution.Context, mutableState execution.MutableState) error) error
}

var (
Expand Down
2 changes: 1 addition & 1 deletion service/history/engine/engineimpl/history_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5450,7 +5450,7 @@ func TestRecordChildExecutionCompleted(t *testing.T) {
timeSource: mockShard.GetTimeSource(),
metricsClient: metrics.NewClient(tally.NoopScope, metrics.History),
logger: mockShard.GetLogger(),
updateWithActionFn: func(_ context.Context, _ *execution.Cache, _ string, _ types.WorkflowExecution, _ bool, _ time.Time, actionFn func(wfContext execution.Context, mutableState execution.MutableState) error) error {
updateWithActionFn: func(_ context.Context, _ execution.Cache, _ string, _ types.WorkflowExecution, _ bool, _ time.Time, actionFn func(wfContext execution.Context, mutableState execution.MutableState) error) error {
return actionFn(nil, ms)
},
}
Expand Down
66 changes: 54 additions & 12 deletions service/history/execution/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination cache_mock.go

package execution

import (
Expand All @@ -40,12 +42,52 @@ import (
"github.com/uber/cadence/service/history/shard"
)

// Cache is a cache that holds workflow execution context
type Cache interface {
cache.Cache

// GetOrCreateCurrentWorkflowExecution gets or creates workflow execution context for the current run
GetOrCreateCurrentWorkflowExecution(
ctx context.Context,
domainID string,
workflowID string,
) (Context, ReleaseFunc, error)

// GetAndCreateWorkflowExecution is for analyzing mutableState, it will try getting Context from cache
// and also load from database
GetAndCreateWorkflowExecution(
ctx context.Context,
domainID string,
execution types.WorkflowExecution,
) (Context, Context, ReleaseFunc, bool, error)

// GetOrCreateWorkflowExecutionForBackground gets or creates workflow execution context with background context
GetOrCreateWorkflowExecutionForBackground(
domainID string,
execution types.WorkflowExecution,
) (Context, ReleaseFunc, error)

// GetOrCreateWorkflowExecutionWithTimeout gets or creates workflow execution context with timeout
GetOrCreateWorkflowExecutionWithTimeout(
domainID string,
execution types.WorkflowExecution,
timeout time.Duration,
) (Context, ReleaseFunc, error)

// GetOrCreateWorkflowExecution gets or creates workflow execution context
GetOrCreateWorkflowExecution(
ctx context.Context,
domainID string,
execution types.WorkflowExecution,
) (Context, ReleaseFunc, error)
}

type (
// ReleaseFunc releases workflow execution context
ReleaseFunc func(err error)

// Cache caches workflow execution context
Cache struct {
cacheImpl struct {
cache.Cache
shard shard.Context
executionManager persistence.ExecutionManager
Expand All @@ -67,15 +109,15 @@ const (
)

// NewCache creates a new workflow execution context cache
func NewCache(shard shard.Context) *Cache {
func NewCache(shard shard.Context) Cache {
opts := &cache.Options{}
config := shard.GetConfig()
opts.InitialCapacity = config.HistoryCacheInitialSize()
opts.TTL = config.HistoryCacheTTL()
opts.Pin = true
opts.MaxCount = config.HistoryCacheMaxSize()

return &Cache{
return &cacheImpl{
Cache: cache.New(opts),
shard: shard,
executionManager: shard.GetExecutionManager(),
Expand All @@ -86,7 +128,7 @@ func NewCache(shard shard.Context) *Cache {
}

// GetOrCreateCurrentWorkflowExecution gets or creates workflow execution context for the current run
func (c *Cache) GetOrCreateCurrentWorkflowExecution(
func (c *cacheImpl) GetOrCreateCurrentWorkflowExecution(
ctx context.Context,
domainID string,
workflowID string,
Expand Down Expand Up @@ -115,7 +157,7 @@ func (c *Cache) GetOrCreateCurrentWorkflowExecution(

// GetAndCreateWorkflowExecution is for analyzing mutableState, it will try getting Context from cache
// and also load from database
func (c *Cache) GetAndCreateWorkflowExecution(
func (c *cacheImpl) GetAndCreateWorkflowExecution(
ctx context.Context,
domainID string,
execution types.WorkflowExecution,
Expand Down Expand Up @@ -157,7 +199,7 @@ func (c *Cache) GetAndCreateWorkflowExecution(

// GetOrCreateWorkflowExecutionForBackground gets or creates workflow execution context with background context
// currently only used in tests
func (c *Cache) GetOrCreateWorkflowExecutionForBackground(
func (c *cacheImpl) GetOrCreateWorkflowExecutionForBackground(
domainID string,
execution types.WorkflowExecution,
) (Context, ReleaseFunc, error) {
Expand All @@ -166,7 +208,7 @@ func (c *Cache) GetOrCreateWorkflowExecutionForBackground(
}

// GetOrCreateWorkflowExecutionWithTimeout gets or creates workflow execution context with timeout
func (c *Cache) GetOrCreateWorkflowExecutionWithTimeout(
func (c *cacheImpl) GetOrCreateWorkflowExecutionWithTimeout(
domainID string,
execution types.WorkflowExecution,
timeout time.Duration,
Expand All @@ -179,7 +221,7 @@ func (c *Cache) GetOrCreateWorkflowExecutionWithTimeout(
}

// GetOrCreateWorkflowExecution gets or creates workflow execution context
func (c *Cache) GetOrCreateWorkflowExecution(
func (c *cacheImpl) GetOrCreateWorkflowExecution(
ctx context.Context,
domainID string,
execution types.WorkflowExecution,
Expand All @@ -204,7 +246,7 @@ func (c *Cache) GetOrCreateWorkflowExecution(
)
}

func (c *Cache) getOrCreateWorkflowExecutionInternal(
func (c *cacheImpl) getOrCreateWorkflowExecutionInternal(
ctx context.Context,
domainID string,
execution types.WorkflowExecution,
Expand Down Expand Up @@ -245,7 +287,7 @@ func (c *Cache) getOrCreateWorkflowExecutionInternal(
return workflowCtx, releaseFunc, nil
}

func (c *Cache) validateWorkflowExecutionInfo(
func (c *cacheImpl) validateWorkflowExecutionInfo(
ctx context.Context,
domainID string,
execution *types.WorkflowExecution,
Expand Down Expand Up @@ -277,7 +319,7 @@ func (c *Cache) validateWorkflowExecutionInfo(
return nil
}

func (c *Cache) makeReleaseFunc(
func (c *cacheImpl) makeReleaseFunc(
key definition.WorkflowIdentifier,
context Context,
forceClearContext bool,
Expand Down Expand Up @@ -305,7 +347,7 @@ func (c *Cache) makeReleaseFunc(
}
}

func (c *Cache) getCurrentExecutionWithRetry(
func (c *cacheImpl) getCurrentExecutionWithRetry(
ctx context.Context,
request *persistence.GetCurrentExecutionRequest,
) (*persistence.GetCurrentExecutionResponse, error) {
Expand Down
Loading
Loading