Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor history cache to its own package #3601

Merged
merged 7 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions service/history/api/consistency_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/versionhistory"
historyCache "go.temporal.io/server/service/history/cache"
mindaugasrukas marked this conversation as resolved.
Show resolved Hide resolved
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/vclock"
Expand All @@ -47,7 +48,7 @@ type (
MutableStateConsistencyPredicate func(mutableState workflow.MutableState) bool

WorkflowConsistencyChecker interface {
GetWorkflowCache() workflow.Cache
GetWorkflowCache() historyCache.Cache
GetCurrentRunID(
ctx context.Context,
namespaceID string,
Expand All @@ -63,21 +64,21 @@ type (

WorkflowConsistencyCheckerImpl struct {
shardContext shard.Context
workflowCache workflow.Cache
workflowCache historyCache.Cache
}
)

func NewWorkflowConsistencyChecker(
shardContext shard.Context,
workflowCache workflow.Cache,
workflowCache historyCache.Cache,
) *WorkflowConsistencyCheckerImpl {
return &WorkflowConsistencyCheckerImpl{
shardContext: shardContext,
workflowCache: workflowCache,
}
}

func (c *WorkflowConsistencyCheckerImpl) GetWorkflowCache() workflow.Cache {
func (c *WorkflowConsistencyCheckerImpl) GetWorkflowCache() historyCache.Cache {
return c.workflowCache
}

Expand Down
5 changes: 3 additions & 2 deletions service/history/api/consistency_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/versionhistory"
historyCache "go.temporal.io/server/service/history/cache"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)
Expand All @@ -54,7 +55,7 @@ type (

controller *gomock.Controller
shardContext *shard.MockContext
workflowCache *workflow.MockCache
workflowCache *historyCache.MockCache

shardID int32
namespaceID string
Expand Down Expand Up @@ -82,7 +83,7 @@ func (s *workflowConsistencyCheckerSuite) SetupTest() {

s.controller = gomock.NewController(s.T())
s.shardContext = shard.NewMockContext(s.controller)
s.workflowCache = workflow.NewMockCache(s.controller)
s.workflowCache = historyCache.NewMockCache(s.controller)

s.shardID = rand.Int31()
s.namespaceID = uuid.New().String()
Expand Down
3 changes: 2 additions & 1 deletion service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/rpc/interceptor"
historyCache "go.temporal.io/server/service/history/cache"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)
Expand Down Expand Up @@ -117,7 +118,7 @@ func NewWorkflowWithSignal(
),
shard.GetLogger(),
)
return NewWorkflowContext(newWorkflowContext, workflow.NoopReleaseFn, newMutableState), nil
return NewWorkflowContext(newWorkflowContext, historyCache.NoopReleaseFn, newMutableState), nil
}

func CreateMutableState(
Expand Down
3 changes: 2 additions & 1 deletion service/history/api/deleteworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/deletemanager"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)
Expand All @@ -43,7 +44,7 @@ func Invoke(
request *historyservice.DeleteWorkflowExecutionRequest,
shard shard.Context,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
workflowDeleteManager workflow.DeleteManager,
workflowDeleteManager deletemanager.DeleteManager,
) (_ *historyservice.DeleteWorkflowExecutionResponse, retError error) {
weCtx, err := workflowConsistencyChecker.GetWorkflowContext(
ctx,
Expand Down
4 changes: 2 additions & 2 deletions service/history/api/reapplyevents/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/service/history/api"
historyCache "go.temporal.io/server/service/history/cache"
"go.temporal.io/server/service/history/ndc"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)

func Invoke(
Expand Down Expand Up @@ -156,7 +156,7 @@ func Invoke(
shard.GetClusterMetadata(),
context,
mutableState,
workflow.NoopReleaseFn,
historyCache.NoopReleaseFn,
),
ndc.EventsReapplicationResetWorkflowReason,
toReapplyEvents,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/log"
"go.temporal.io/server/service/history/api"
historyCache "go.temporal.io/server/service/history/cache"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tests"
"go.temporal.io/server/service/history/workflow"
Expand Down Expand Up @@ -111,7 +112,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_Dedup() {
ctx := context.Background()
currentWorkflowContext := api.NewWorkflowContext(
s.currentContext,
workflow.NoopReleaseFn,
historyCache.NoopReleaseFn,
s.currentMutableState,
)
request := s.randomRequest()
Expand All @@ -131,7 +132,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NewWorkflowTask() {
ctx := context.Background()
currentWorkflowContext := api.NewWorkflowContext(
s.currentContext,
workflow.NoopReleaseFn,
historyCache.NoopReleaseFn,
s.currentMutableState,
)
request := s.randomRequest()
Expand Down Expand Up @@ -161,7 +162,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NoNewWorkflowTask() {
ctx := context.Background()
currentWorkflowContext := api.NewWorkflowContext(
s.currentContext,
workflow.NoopReleaseFn,
historyCache.NoopReleaseFn,
s.currentMutableState,
)
request := s.randomRequest()
Expand Down
9 changes: 5 additions & 4 deletions service/history/api/workflow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ package api
import (
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/namespace"
historyCache "go.temporal.io/server/service/history/cache"
"go.temporal.io/server/service/history/workflow"
)

type WorkflowContext interface {
GetContext() workflow.Context
GetMutableState() workflow.MutableState
GetReleaseFn() workflow.ReleaseCacheFunc
GetReleaseFn() historyCache.ReleaseCacheFunc

GetNamespaceEntry() *namespace.Namespace
GetWorkflowKey() definition.WorkflowKey
Expand All @@ -42,7 +43,7 @@ type WorkflowContext interface {
type WorkflowContextImpl struct {
context workflow.Context
mutableState workflow.MutableState
releaseFn workflow.ReleaseCacheFunc
releaseFn historyCache.ReleaseCacheFunc
}

type UpdateWorkflowAction struct {
Expand All @@ -65,7 +66,7 @@ var _ WorkflowContext = (*WorkflowContextImpl)(nil)

func NewWorkflowContext(
context workflow.Context,
releaseFn workflow.ReleaseCacheFunc,
releaseFn historyCache.ReleaseCacheFunc,
mutableState workflow.MutableState,
) *WorkflowContextImpl {

Expand All @@ -84,7 +85,7 @@ func (w *WorkflowContextImpl) GetMutableState() workflow.MutableState {
return w.mutableState
}

func (w *WorkflowContextImpl) GetReleaseFn() workflow.ReleaseCacheFunc {
func (w *WorkflowContextImpl) GetReleaseFn() historyCache.ReleaseCacheFunc {
return w.releaseFn
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination cache_mock.go

package workflow
package cache

import (
"context"
Expand All @@ -45,6 +45,7 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)

type (
Expand All @@ -55,8 +56,8 @@ type (
ctx context.Context,
namespaceID namespace.ID,
execution commonpb.WorkflowExecution,
caller CallerType,
) (Context, ReleaseCacheFunc, error)
caller workflow.CallerType,
) (workflow.Context, ReleaseCacheFunc, error)
}

CacheImpl struct {
Expand Down Expand Up @@ -97,8 +98,8 @@ func (c *CacheImpl) GetOrCreateWorkflowExecution(
ctx context.Context,
namespaceID namespace.ID,
execution commonpb.WorkflowExecution,
caller CallerType,
) (Context, ReleaseCacheFunc, error) {
caller workflow.CallerType,
) (workflow.Context, ReleaseCacheFunc, error) {

if err := c.validateWorkflowExecutionInfo(ctx, namespaceID, &execution); err != nil {
return nil, nil, err
Expand Down Expand Up @@ -129,21 +130,21 @@ func (c *CacheImpl) getOrCreateWorkflowExecutionInternal(
execution commonpb.WorkflowExecution,
handler metrics.MetricsHandler,
forceClearContext bool,
caller CallerType,
) (Context, ReleaseCacheFunc, error) {
caller workflow.CallerType,
) (workflow.Context, ReleaseCacheFunc, error) {

key := definition.NewWorkflowKey(namespaceID.String(), execution.GetWorkflowId(), execution.GetRunId())
workflowCtx, cacheHit := c.Get(key).(Context)
workflowCtx, cacheHit := c.Get(key).(workflow.Context)
if !cacheHit {
handler.Counter(metrics.CacheMissCounter.GetMetricName()).Record(1)
// Let's create the workflow execution workflowCtx
workflowCtx = NewContext(c.shard, key, c.logger)
workflowCtx = workflow.NewContext(c.shard, key, c.logger)
elem, err := c.PutIfNotExist(key, workflowCtx)
if err != nil {
handler.Counter(metrics.CacheFailures.GetMetricName()).Record(1)
return nil, nil, err
}
workflowCtx = elem.(Context)
workflowCtx = elem.(workflow.Context)
}

// TODO This will create a closure on every request.
Expand All @@ -162,9 +163,9 @@ func (c *CacheImpl) getOrCreateWorkflowExecutionInternal(

func (c *CacheImpl) makeReleaseFunc(
key definition.WorkflowKey,
context Context,
context workflow.Context,
forceClearContext bool,
caller CallerType,
caller workflow.CallerType,
) func(error) {

status := cacheNotReleased
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading