From 285bf333f85db37a88e880c40cd3fc211140fa81 Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Tue, 24 Jan 2023 14:51:33 -0800 Subject: [PATCH] Start the archival queue iff history or visibility archival is enabled in the static config (#3827) Enable archival queue factory iff it is enabled in the static config --- common/archiver/archivalMetadata.go | 5 + common/archiver/archivalMetadata_mock.go | 14 ++ service/history/historyEngineFactory.go | 2 +- service/history/queueFactoryBase.go | 38 +++- service/history/queue_factory_base_test.go | 192 +++++++++++++++++++++ 5 files changed, 245 insertions(+), 6 deletions(-) create mode 100644 service/history/queue_factory_base_test.go diff --git a/common/archiver/archivalMetadata.go b/common/archiver/archivalMetadata.go index 6e23928f7fc..83407caf9a3 100644 --- a/common/archiver/archivalMetadata.go +++ b/common/archiver/archivalMetadata.go @@ -52,6 +52,7 @@ type ( ReadEnabled() bool GetNamespaceDefaultState() enumspb.ArchivalState GetNamespaceDefaultURI() string + StaticClusterState() ArchivalState } archivalMetadata struct { @@ -71,6 +72,10 @@ type ( ArchivalState int ) +func (a *archivalConfig) StaticClusterState() ArchivalState { + return a.staticClusterState +} + const ( // ArchivalDisabled means this cluster is not configured to handle archival ArchivalDisabled ArchivalState = iota diff --git a/common/archiver/archivalMetadata_mock.go b/common/archiver/archivalMetadata_mock.go index 6b648f40f58..5fac01d828f 100644 --- a/common/archiver/archivalMetadata_mock.go +++ b/common/archiver/archivalMetadata_mock.go @@ -178,3 +178,17 @@ func (mr *MockArchivalConfigMockRecorder) ReadEnabled() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadEnabled", reflect.TypeOf((*MockArchivalConfig)(nil).ReadEnabled)) } + +// StaticClusterState mocks base method. +func (m *MockArchivalConfig) StaticClusterState() ArchivalState { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StaticClusterState") + ret0, _ := ret[0].(ArchivalState) + return ret0 +} + +// StaticClusterState indicates an expected call of StaticClusterState. +func (mr *MockArchivalConfigMockRecorder) StaticClusterState() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StaticClusterState", reflect.TypeOf((*MockArchivalConfig)(nil).StaticClusterState)) +} diff --git a/service/history/historyEngineFactory.go b/service/history/historyEngineFactory.go index e19d2fe38f9..d3377dc9916 100644 --- a/service/history/historyEngineFactory.go +++ b/service/history/historyEngineFactory.go @@ -55,7 +55,7 @@ type ( NewCacheFn wcache.NewCacheFn ArchivalClient archiver.Client EventSerializer serialization.Serializer - QueueFactories []QueueFactory `group:"queueFactory"` + QueueFactories []QueueFactory ReplicationTaskFetcherFactory replication.TaskFetcherFactory ReplicationTaskExecutorProvider replication.TaskExecutorProvider TracerProvider trace.TracerProvider diff --git a/service/history/queueFactoryBase.go b/service/history/queueFactoryBase.go index 07b51852e53..66cf29bc7a3 100644 --- a/service/history/queueFactoryBase.go +++ b/service/history/queueFactoryBase.go @@ -31,6 +31,7 @@ import ( "go.uber.org/fx" "go.temporal.io/server/common" + "go.temporal.io/server/common/archiver" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/dynamicconfig" @@ -87,7 +88,7 @@ type ( fx.In Lifecycle fx.Lifecycle - Factories []QueueFactory `group:"queueFactory"` + Factories []QueueFactory } ) @@ -95,25 +96,52 @@ var QueueModule = fx.Options( fx.Provide(QueueSchedulerRateLimiterProvider), fx.Provide( fx.Annotated{ - Group: QueueFactoryFxGroup, + Name: "transferQueueFactory", Target: NewTransferQueueFactory, }, fx.Annotated{ - Group: QueueFactoryFxGroup, + Name: "timerQueueFactory", Target: NewTimerQueueFactory, }, fx.Annotated{ - Group: QueueFactoryFxGroup, + Name: "visibilityQueueFactory", Target: NewVisibilityQueueFactory, }, fx.Annotated{ - Group: QueueFactoryFxGroup, + Name: "archivalQueueFactory", Target: NewArchivalQueueFactory, }, + getQueueFactories, ), fx.Invoke(QueueFactoryLifetimeHooks), ) +type queueFactorySet struct { + fx.In + + TransferQueueFactory QueueFactory `name:"transferQueueFactory"` + TimerQueueFactory QueueFactory `name:"timerQueueFactory"` + VisibilityQueueFactory QueueFactory `name:"visibilityQueueFactory"` + ArchivalQueueFactory QueueFactory `name:"archivalQueueFactory"` +} + +// getQueueFactories returns factories for all the enabled queue types. +// The archival queue factory is only returned when archival is enabled in the static config. +func getQueueFactories( + queueFactorySet queueFactorySet, + archivalMetadata archiver.ArchivalMetadata, +) []QueueFactory { + factories := []QueueFactory{ + queueFactorySet.TransferQueueFactory, + queueFactorySet.TimerQueueFactory, + queueFactorySet.VisibilityQueueFactory, + } + if archivalMetadata.GetHistoryConfig().StaticClusterState() == archiver.ArchivalEnabled || archivalMetadata.GetVisibilityConfig().StaticClusterState() == archiver.ArchivalEnabled { + factories = append(factories, queueFactorySet.ArchivalQueueFactory) + } + return factories +} + func QueueSchedulerRateLimiterProvider( config *configs.Config, ) queues.SchedulerRateLimiter { diff --git a/service/history/queue_factory_base_test.go b/service/history/queue_factory_base_test.go new file mode 100644 index 00000000000..9d32b0edb1c --- /dev/null +++ b/service/history/queue_factory_base_test.go @@ -0,0 +1,192 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "go.uber.org/fx" + + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/client" + carchiver "go.temporal.io/server/common/archiver" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" + "go.temporal.io/server/common/persistence/visibility/manager" + "go.temporal.io/server/common/resource" + "go.temporal.io/server/common/sdk" + "go.temporal.io/server/service/history/archival" + "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/workflow" + "go.temporal.io/server/service/worker/archiver" +) + +// TestQueueModule_ArchivalQueueCreated tests that the archival queue is created if and only if the static config for +// either history or visibility archival is enabled. +func TestQueueModule_ArchivalQueue(t *testing.T) { + for _, c := range []moduleTestCase{ + { + Name: "Archival completely disabled", + HistoryState: carchiver.ArchivalDisabled, + VisibilityState: carchiver.ArchivalDisabled, + ExpectArchivalQueue: false, + }, + { + Name: "History archival enabled", + HistoryState: carchiver.ArchivalEnabled, + VisibilityState: carchiver.ArchivalDisabled, + ExpectArchivalQueue: true, + }, + { + Name: "Visibility archival enabled", + HistoryState: carchiver.ArchivalDisabled, + VisibilityState: carchiver.ArchivalEnabled, + ExpectArchivalQueue: true, + }, + { + Name: "Both history and visibility archival enabled", + HistoryState: carchiver.ArchivalEnabled, + VisibilityState: carchiver.ArchivalEnabled, + ExpectArchivalQueue: true, + }, + } { + c := c + t.Run(c.Name, c.Run) + } +} + +// moduleTestCase is a test case for the QueueModule. +type moduleTestCase struct { + Name string + HistoryState carchiver.ArchivalState + VisibilityState carchiver.ArchivalState + ExpectArchivalQueue bool +} + +// Run runs the test case. +func (c *moduleTestCase) Run(t *testing.T) { + t.Parallel() + controller := gomock.NewController(t) + dependencies := getModuleDependencies(controller, c) + var factories []QueueFactory + + app := fx.New( + dependencies, + QueueModule, + fx.Invoke(func(params QueueFactoriesLifetimeHookParams) { + factories = params.Factories + }), + ) + + require.NoError(t, app.Err()) + require.NotNil(t, factories) + var ( + txq QueueFactory + tiq QueueFactory + viq QueueFactory + aq QueueFactory + ) + for _, f := range factories { + switch f.(type) { + case *transferQueueFactory: + require.Nil(t, txq) + txq = f + case *timerQueueFactory: + require.Nil(t, tiq) + tiq = f + case *visibilityQueueFactory: + require.Nil(t, viq) + viq = f + case *archivalQueueFactory: + require.Nil(t, aq) + aq = f + } + } + require.NotNil(t, txq) + require.NotNil(t, tiq) + require.NotNil(t, viq) + if c.ExpectArchivalQueue { + require.NotNil(t, aq) + } else { + require.Nil(t, aq) + } +} + +// getModuleDependencies returns an fx.Option that provides all the dependencies needed for the queue module. +func getModuleDependencies(controller *gomock.Controller, c *moduleTestCase) fx.Option { + cfg := configs.NewConfig( + dynamicconfig.NewNoopCollection(), + 1, + false, + "", + ) + archivalMetadata := getArchivalMetadata(controller, c) + clusterMetadata := cluster.NewMockMetadata(controller) + clusterMetadata.EXPECT().GetCurrentClusterName().Return("module-test-cluster-name").AnyTimes() + return fx.Supply( + compileTimeDependencies{}, + cfg, + fx.Annotate(archivalMetadata, fx.As(new(carchiver.ArchivalMetadata))), + fx.Annotate(metrics.NoopMetricsHandler, fx.As(new(metrics.Handler))), + fx.Annotate(clusterMetadata, fx.As(new(cluster.Metadata))), + ) +} + +// compileTimeDependencies is a struct that provides nil implementations of all the dependencies needed for the queue +// module that are not required for the test at runtime. +type compileTimeDependencies struct { + fx.Out + + namespace.Registry + clock.TimeSource + log.SnTaggedLogger + client.Bean + archiver.Client + sdk.ClientFactory + resource.MatchingClient + historyservice.HistoryServiceClient + manager.VisibilityManager + archival.Archiver + workflow.RelocatableAttributesFetcher +} + +// getArchivalMetadata returns a mock ArchivalMetadata that contains the static archival config specified in the given +// test case. +func getArchivalMetadata(controller *gomock.Controller, c *moduleTestCase) *carchiver.MockArchivalMetadata { + archivalMetadata := carchiver.NewMockArchivalMetadata(controller) + historyConfig := carchiver.NewMockArchivalConfig(controller) + visibilityConfig := carchiver.NewMockArchivalConfig(controller) + historyConfig.EXPECT().StaticClusterState().Return(c.HistoryState).AnyTimes() + visibilityConfig.EXPECT().StaticClusterState().Return(c.VisibilityState).AnyTimes() + archivalMetadata.EXPECT().GetHistoryConfig().Return(historyConfig).AnyTimes() + archivalMetadata.EXPECT().GetVisibilityConfig().Return(visibilityConfig).AnyTimes() + return archivalMetadata +}