Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dynamic configs for min size to remove attributes from mutable state and disabling fetch from visibility #6686

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,668 changes: 835 additions & 833 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,13 +1841,26 @@ the outbound standby task failed to be processed due to missing events.`,
`VisibilityProcessorEnableCloseWorkflowCleanup to clean up the mutable state after visibility
close task has been processed. Must use Elasticsearch as visibility store, otherwise workflow
data (eg: search attributes) will be lost after workflow is closed.`,
)
VisibilityProcessorRelocateAttributesMinBlobSize = NewNamespaceIntSetting(
"history.visibilityProcessorRelocateAttributesMinBlobSize",
0,
`VisibilityProcessorRelocateAttributesMinBlobSize is the minimum size in bytes of memo or search
attributes.`,
)
VisibilityQueueMaxReaderCount = NewGlobalIntSetting(
"history.visibilityQueueMaxReaderCount",
2,
`VisibilityQueueMaxReaderCount is the max number of readers in one multi-cursor visibility queue`,
)

DisableFetchRelocatableAttributesFromVisibility = NewNamespaceBoolSetting(
"history.disableFetchRelocatableAttributesFromVisibility",
false,
`DisableFetchRelocatableAttributesFromVisibility disables fetching memo and search attributes from
visibility if they were removed from the mutable state`,
)

ArchivalTaskBatchSize = NewGlobalIntSetting(
"history.archivalTaskBatchSize",
100,
Expand Down
2 changes: 1 addition & 1 deletion common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func TestMergeProtoExcludingFields(t *testing.T) {
&info.ParentClock,
&info.CloseTransferTaskId,
&info.CloseVisibilityTaskId,
&info.CloseVisibilityTaskCompleted,
&info.RelocatableAttributesRemoved,
&info.WorkflowExecutionTimerTaskStatus,
&info.SubStateMachinesByType,
&info.StateMachineTimers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ message WorkflowExecutionInfo {
// Used to check if visibility close task is processed before deleting the workflow execution.
int64 close_visibility_task_id = 65;
google.protobuf.Timestamp close_time = 66;
bool close_visibility_task_completed = 67;
// Relocatable attributes are memo and search attributes. If they were removed, then they are not
// present in the mutable state, and they should be in visibility store.
bool relocatable_attributes_removed = 67;
temporal.server.api.workflow.v1.BaseExecutionInfo base_execution_info = 76;
// If using build-id based versioning: version stamp of the last worker to complete a
// workflow tasks for this workflow.
Expand Down
6 changes: 5 additions & 1 deletion service/history/api/describeworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,11 @@ func Invoke(
}
}

relocatableAttributes, err := workflow.RelocatableAttributesFetcherProvider(persistenceVisibilityMgr).Fetch(ctx, mutableState)
relocatableAttrsFetcher := workflow.RelocatableAttributesFetcherProvider(
shard.GetConfig(),
persistenceVisibilityMgr,
)
relocatableAttributes, err := relocatableAttrsFetcher.Fetch(ctx, mutableState)
if err != nil {
shard.GetLogger().Error(
"Failed to fetch relocatable attributes",
Expand Down
12 changes: 6 additions & 6 deletions service/history/archival_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,13 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
{
Name: "close visibility task complete",
Configure: func(p *params) {
p.CloseVisibilityTaskCompleted = true
p.RelocatableAttributesRemoved = true
},
},
{
Name: "get workflow execution from visibility error",
Configure: func(p *params) {
p.CloseVisibilityTaskCompleted = true
p.RelocatableAttributesRemoved = true
p.GetWorkflowExecutionError = errors.New("get workflow execution error")
p.ExpectedErrorSubstrings = []string{"get workflow execution error"}
p.ExpectArchive = false
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
NamespaceId: tests.NamespaceID.String(),
ExecutionTime: timestamppb.New(p.ExecutionTime),
CloseTime: timestamppb.New(p.CloseTime),
CloseVisibilityTaskCompleted: p.CloseVisibilityTaskCompleted,
RelocatableAttributesRemoved: p.RelocatableAttributesRemoved,
}
mutableState.EXPECT().GetExecutionInfo().Return(executionInfo).AnyTimes()
executionState := &persistence.WorkflowExecutionState{
Expand Down Expand Up @@ -505,7 +505,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
}

visibilityManager := manager.NewMockVisibilityManager(p.Controller)
if p.CloseVisibilityTaskCompleted {
if p.RelocatableAttributesRemoved {
visibilityManager.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(
&manager.GetWorkflowExecutionResponse{Execution: &workflowpb.WorkflowExecutionInfo{
Memo: nil,
Expand All @@ -519,7 +519,7 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
a,
shardContext,
workflowCache,
workflow.RelocatableAttributesFetcherProvider(visibilityManager),
workflow.RelocatableAttributesFetcherProvider(shardContext.GetConfig(), visibilityManager),
p.MetricsHandler,
logger,
)
Expand Down Expand Up @@ -584,7 +584,7 @@ type params struct {
GetWorkflowCloseTimeError error
GetWorkflowExecutionDurationError error
GetCurrentBranchTokenError error
CloseVisibilityTaskCompleted bool
RelocatableAttributesRemoved bool
ExpectGetWorkflowExecution bool
GetWorkflowExecutionError error
LoadMutableStateError error
Expand Down
8 changes: 8 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,13 @@ type Config struct {
VisibilityProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
VisibilityProcessorEnsureCloseBeforeDelete dynamicconfig.BoolPropertyFn
VisibilityProcessorEnableCloseWorkflowCleanup dynamicconfig.BoolPropertyFnWithNamespaceFilter
VisibilityProcessorRelocateAttributesMinBlobSize dynamicconfig.IntPropertyFnWithNamespaceFilter
VisibilityQueueMaxReaderCount dynamicconfig.IntPropertyFn

// Disable fetching memo and search attributes from visibility in the event that they were removed
// from the mutable state in the close execution visibility task clean up.
DisableFetchRelocatableAttributesFromVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter

SearchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
SearchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
SearchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
Expand Down Expand Up @@ -622,8 +627,11 @@ func NewConfig(
VisibilityProcessorPollBackoffInterval: dynamicconfig.VisibilityProcessorPollBackoffInterval.Get(dc),
VisibilityProcessorEnsureCloseBeforeDelete: dynamicconfig.VisibilityProcessorEnsureCloseBeforeDelete.Get(dc),
VisibilityProcessorEnableCloseWorkflowCleanup: dynamicconfig.VisibilityProcessorEnableCloseWorkflowCleanup.Get(dc),
VisibilityProcessorRelocateAttributesMinBlobSize: dynamicconfig.VisibilityProcessorRelocateAttributesMinBlobSize.Get(dc),
VisibilityQueueMaxReaderCount: dynamicconfig.VisibilityQueueMaxReaderCount.Get(dc),

DisableFetchRelocatableAttributesFromVisibility: dynamicconfig.DisableFetchRelocatableAttributesFromVisibility.Get(dc),

SearchAttributesNumberOfKeysLimit: dynamicconfig.SearchAttributesNumberOfKeysLimit.Get(dc),
SearchAttributesSizeOfValueLimit: dynamicconfig.SearchAttributesSizeOfValueLimit.Get(dc),
SearchAttributesTotalSizeLimit: dynamicconfig.SearchAttributesTotalSizeLimit.Get(dc),
Expand Down
1 change: 1 addition & 0 deletions service/history/visibility_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (f *visibilityQueueFactory) CreateQueue(
f.MetricsHandler,
f.Config.VisibilityProcessorEnsureCloseBeforeDelete,
f.Config.VisibilityProcessorEnableCloseWorkflowCleanup,
f.Config.VisibilityProcessorRelocateAttributesMinBlobSize,
)
if f.ExecutorWrapper != nil {
executor = f.ExecutorWrapper.Wrap(executor)
Expand Down
29 changes: 23 additions & 6 deletions service/history/visibility_queue_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ type (
metricProvider metrics.Handler
visibilityMgr manager.VisibilityManager

ensureCloseBeforeDelete dynamicconfig.BoolPropertyFn
enableCloseWorkflowCleanup dynamicconfig.BoolPropertyFnWithNamespaceFilter
ensureCloseBeforeDelete dynamicconfig.BoolPropertyFn
enableCloseWorkflowCleanup dynamicconfig.BoolPropertyFnWithNamespaceFilter
relocateAttributesMinBlobSize dynamicconfig.IntPropertyFnWithNamespaceFilter
}
)

Expand All @@ -70,6 +71,7 @@ func newVisibilityQueueTaskExecutor(
metricProvider metrics.Handler,
ensureCloseBeforeDelete dynamicconfig.BoolPropertyFn,
enableCloseWorkflowCleanup dynamicconfig.BoolPropertyFnWithNamespaceFilter,
relocateAttributesMinBlobSize dynamicconfig.IntPropertyFnWithNamespaceFilter,
) queues.Executor {
return &visibilityQueueTaskExecutor{
shardContext: shardContext,
Expand All @@ -78,8 +80,9 @@ func newVisibilityQueueTaskExecutor(
metricProvider: metricProvider,
visibilityMgr: visibilityMgr,

ensureCloseBeforeDelete: ensureCloseBeforeDelete,
enableCloseWorkflowCleanup: enableCloseWorkflowCleanup,
ensureCloseBeforeDelete: ensureCloseBeforeDelete,
enableCloseWorkflowCleanup: enableCloseWorkflowCleanup,
relocateAttributesMinBlobSize: relocateAttributesMinBlobSize,
}
}

Expand Down Expand Up @@ -305,12 +308,26 @@ func (t *visibilityQueueTaskExecutor) processCloseExecution(
// Therefore, ctx timeout might be already expired
// and parentCtx (which doesn't have timeout) must be used everywhere bellow.

if t.enableCloseWorkflowCleanup(namespaceEntry.Name().String()) {
if t.needRunCleanUp(requestBase) {
return t.cleanupExecutionInfo(parentCtx, task)
}
return nil
}

func (t *visibilityQueueTaskExecutor) needRunCleanUp(
request *manager.VisibilityRequestBase,
) bool {
if !t.enableCloseWorkflowCleanup(request.Namespace.String()) {
return false
}
// If there are no memo nor search attributes, then no clean up is necessary.
if len(request.Memo.GetFields()) == 0 && len(request.SearchAttributes.GetIndexedFields()) == 0 {
return false
}
minSize := t.relocateAttributesMinBlobSize(request.Namespace.String())
return request.Memo.Size() >= minSize || request.SearchAttributes.Size() >= minSize
}

func (t *visibilityQueueTaskExecutor) processDeleteExecution(
ctx context.Context,
task *tasks.DeleteExecutionVisibilityTask,
Expand Down Expand Up @@ -444,7 +461,7 @@ func (t *visibilityQueueTaskExecutor) cleanupExecutionInfo(
executionInfo := mutableState.GetExecutionInfo()
executionInfo.Memo = nil
executionInfo.SearchAttributes = nil
executionInfo.CloseVisibilityTaskCompleted = true
executionInfo.RelocatableAttributesRemoved = true
return weContext.SetWorkflowExecution(ctx, t.shardContext)
}

Expand Down
1 change: 1 addition & 0 deletions service/history/visibility_queue_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func (s *visibilityQueueTaskExecutorSuite) SetupTest() {
metrics.NoopMetricsHandler,
config.VisibilityProcessorEnsureCloseBeforeDelete,
func(_ string) bool { return s.enableCloseWorkflowCleanup },
config.VisibilityProcessorRelocateAttributesMinBlobSize,
)
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6630,7 +6630,7 @@ func (ms *MutableStateImpl) syncExecutionInfo(current *persistencespb.WorkflowEx
&info.ParentClock,
&info.CloseTransferTaskId,
&info.CloseVisibilityTaskId,
&info.CloseVisibilityTaskCompleted,
&info.RelocatableAttributesRemoved,
&info.WorkflowExecutionTimerTaskStatus,
&info.SubStateMachinesByType,
&info.StateMachineTimers,
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3206,7 +3206,7 @@ func (s *mutableStateSuite) verifyExecutionInfo(current, target, origin *persist
s.True(proto.Equal(origin.ParentClock, current.ParentClock), "ParentClock mismatch")
s.Equal(origin.CloseTransferTaskId, current.CloseTransferTaskId, "CloseTransferTaskId mismatch")
s.Equal(origin.CloseVisibilityTaskId, current.CloseVisibilityTaskId, "CloseVisibilityTaskId mismatch")
s.Equal(origin.CloseVisibilityTaskCompleted, current.CloseVisibilityTaskCompleted, "CloseVisibilityTaskCompleted mismatch")
s.Equal(origin.RelocatableAttributesRemoved, current.RelocatableAttributesRemoved, "RelocatableAttributesRemoved mismatch")
s.Equal(origin.WorkflowExecutionTimerTaskStatus, current.WorkflowExecutionTimerTaskStatus, "WorkflowExecutionTimerTaskStatus mismatch")
s.Equal(origin.SubStateMachinesByType, current.SubStateMachinesByType, "SubStateMachinesByType mismatch")
s.Equal(origin.StateMachineTimers, current.StateMachineTimers, "StateMachineTimers mismatch")
Expand Down
23 changes: 16 additions & 7 deletions service/history/workflow/relocatable_attributes_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"context"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/service/history/configs"
)

// RelocatableAttributesFetcher is used to fetch the relocatable attributes from the mutable state.
Expand All @@ -44,18 +46,19 @@ type RelocatableAttributesFetcher interface {
// The manager.VisibilityManager parameter is used to fetch the relocatable attributes from the persistence backend iff
// we already moved them there out from the mutable state.
// The visibility manager is not used if the relocatable attributes are still in the mutable state.
// We detect that the fields have moved by checking if the CloseExecutionVisibilityTask for this workflow execution is
// marked as complete in the mutable state.
// We detect that the fields have moved by checking the RelocatableAttributesRemoved flag in the mutable state.
// Because the relocatable fields that we push to persistence are never updated thereafter,
// we may cache them on a per-workflow execution basis.
// Currently, there is no cache, but you may provide a manager.VisibilityManager that supports caching to this function
// safely.
// TODO: Add a cache around the visibility manager for the relocatable attributes.
func RelocatableAttributesFetcherProvider(
config *configs.Config,
visibilityManager manager.VisibilityManager,
) RelocatableAttributesFetcher {
return &relocatableAttributesFetcher{
visibilityManager: visibilityManager,
visibilityManager: visibilityManager,
disableFetchFromVisibility: config.DisableFetchRelocatableAttributesFromVisibility,
}
}

Expand All @@ -69,25 +72,31 @@ type RelocatableAttributes struct {
// relocatableAttributesFetcher is the default implementation of RelocatableAttributesFetcher.
type relocatableAttributesFetcher struct {
visibilityManager manager.VisibilityManager

disableFetchFromVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter
}

// Fetch fetches the relocatable attributes from the mutable state or the persistence backend.
// First, it checks if the close visibility task is completed. If it is completed, then the relocatable attributes
// First, it checks if the close visibility task clean up was executed. If it was, then the relocatable attributes
// are fetched from the persistence backend. Otherwise, the relocatable attributes are fetched from the mutable state.
func (f *relocatableAttributesFetcher) Fetch(
ctx context.Context,
mutableState MutableState,
) (*RelocatableAttributes, error) {
executionInfo := mutableState.GetExecutionInfo()
// If we haven't processed close visibility task yet, then we can fetch the search attributes and memo from the
// mutable state.
if !executionInfo.GetCloseVisibilityTaskCompleted() {
// If the relocatable attributes were not removed from mutable state, then we can fetch the memo
// and search attributes from the mutable state.
if !executionInfo.GetRelocatableAttributesRemoved() {
return &RelocatableAttributes{
Memo: &commonpb.Memo{Fields: executionInfo.Memo},
SearchAttributes: &commonpb.SearchAttributes{IndexedFields: executionInfo.SearchAttributes},
}, nil
}

if f.disableFetchFromVisibility(mutableState.GetNamespaceEntry().Name().String()) {
return &RelocatableAttributes{}, nil
}

// If we have processed close visibility task, then we need to fetch the search attributes and memo from the
// persistence backend because we have already deleted them from the mutable state.
executionState := mutableState.GetExecutionState()
Expand Down
30 changes: 22 additions & 8 deletions service/history/workflow/relocatable_attributes_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"go.temporal.io/api/common/v1"
"go.temporal.io/api/workflow/v1"
"go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/service/history/tests"
"go.uber.org/mock/gomock"
Expand All @@ -58,32 +59,43 @@ func TestRelocatableAttributesFetcher_Fetch(t *testing.T) {
"searchAttributesLocation": {Data: []byte("persistence")},
}},
}

emptyAttributes := &RelocatableAttributes{}

require.NotEqual(t, mutableStateAttributes.Memo, persistenceAttributes.Memo)
require.NotEqual(t, mutableStateAttributes.SearchAttributes, persistenceAttributes.SearchAttributes)
testErr := errors.New("test error")
for _, c := range []*struct {
Name string
CloseVisibilityTaskCompleted bool
RelocatableAttributesRemoved bool
DisableFetchFromVisibility bool
GetWorkflowExecutionErr error

ExpectedInfo *RelocatableAttributes
ExpectedErr error
}{
{
Name: "CloseVisibilityTaskNotComplete",
CloseVisibilityTaskCompleted: false,
RelocatableAttributesRemoved: false,

ExpectedInfo: mutableStateAttributes,
},
{
Name: "CloseVisibilityTaskCompleted",
CloseVisibilityTaskCompleted: true,
Name: "RelocatableAttributesRemoved",
RelocatableAttributesRemoved: true,

ExpectedInfo: persistenceAttributes,
},
{
Name: "RelocatableAttributesRemoved DisableFetchFromVisibility",
RelocatableAttributesRemoved: true,
DisableFetchFromVisibility: true,

ExpectedInfo: emptyAttributes,
},
{
Name: "GetWorkflowExecutionErr",
CloseVisibilityTaskCompleted: true,
RelocatableAttributesRemoved: true,
GetWorkflowExecutionErr: testErr,

ExpectedErr: testErr,
Expand All @@ -96,7 +108,7 @@ func TestRelocatableAttributesFetcher_Fetch(t *testing.T) {
executionInfo := &persistence.WorkflowExecutionInfo{
Memo: mutableStateAttributes.Memo.Fields,
SearchAttributes: mutableStateAttributes.SearchAttributes.IndexedFields,
CloseVisibilityTaskCompleted: c.CloseVisibilityTaskCompleted,
RelocatableAttributesRemoved: c.RelocatableAttributesRemoved,
CloseTime: timestamppb.New(closeTime),
WorkflowId: tests.WorkflowID,
}
Expand All @@ -110,7 +122,7 @@ func TestRelocatableAttributesFetcher_Fetch(t *testing.T) {
mutableState.EXPECT().GetExecutionInfo().Return(executionInfo).AnyTimes()
mutableState.EXPECT().GetNamespaceEntry().Return(namespaceEntry).AnyTimes()
mutableState.EXPECT().GetExecutionState().Return(executionState).AnyTimes()
if c.CloseVisibilityTaskCompleted {
if c.RelocatableAttributesRemoved && !c.DisableFetchFromVisibility {
visibilityManager.EXPECT().GetWorkflowExecution(gomock.Any(), &manager.GetWorkflowExecutionRequest{
NamespaceID: namespaceEntry.ID(),
Namespace: namespaceEntry.Name(),
Expand All @@ -125,7 +137,9 @@ func TestRelocatableAttributesFetcher_Fetch(t *testing.T) {
}
ctx := context.Background()

fetcher := RelocatableAttributesFetcherProvider(visibilityManager)
cfg := tests.NewDynamicConfig()
cfg.DisableFetchRelocatableAttributesFromVisibility = dynamicconfig.GetBoolPropertyFnFilteredByNamespace(c.DisableFetchFromVisibility)
fetcher := RelocatableAttributesFetcherProvider(cfg, visibilityManager)
info, err := fetcher.Fetch(ctx, mutableState)

if c.ExpectedErr != nil {
Expand Down
Loading