From 2e97487764f155031776f74a43542b8ab2f36605 Mon Sep 17 00:00:00 2001 From: Zijian Date: Tue, 7 Nov 2023 17:36:38 +0800 Subject: [PATCH] Scaffold GetTaskListSize method at persistence layer --- common/log/tag/values.go | 1 + common/metrics/defs.go | 3 + common/mocks/TaskManager.go | 85 ++++++++++++++++--- common/persistence/dataManagerInterfaces.go | 13 +++ .../persistence/dataManagerInterfaces_mock.go | 15 ++++ .../persistenceErrorInjectionClients.go | 25 ++++++ .../persistence/persistenceMetricClients.go | 17 ++++ .../persistenceRateLimitedClients.go | 10 +++ common/persistence/taskManager.go | 5 ++ 9 files changed, 163 insertions(+), 11 deletions(-) diff --git a/common/log/tag/values.go b/common/log/tag/values.go index 93acc4cff81..f707eae3e88 100644 --- a/common/log/tag/values.go +++ b/common/log/tag/values.go @@ -222,6 +222,7 @@ var ( StoreOperationUpdateTaskList = storeOperation("update-task-list") StoreOperationListTaskList = storeOperation("list-task-list") StoreOperationDeleteTaskList = storeOperation("delete-task-list") + StoreOperationGetTaskListSize = storeOperation("get-task-list-size") StoreOperationStopTaskList = storeOperation("stop-task-list") StoreOperationCreateDomain = storeOperation("create-domain") diff --git a/common/metrics/defs.go b/common/metrics/defs.go index ad123c2da94..45513ceb6d4 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -219,6 +219,8 @@ const ( PersistenceListTaskListScope // PersistenceDeleteTaskListScope is the metric scope for persistence.TaskManager.DeleteTaskList API PersistenceDeleteTaskListScope + // PersistenceGetTaskListSizeScope is the metric scope for persistence.TaskManager.GetTaskListSize API + PersistenceGetTaskListSizeScope // PersistenceAppendHistoryEventsScope tracks AppendHistoryEvents calls made by service to persistence layer PersistenceAppendHistoryEventsScope // PersistenceGetWorkflowExecutionHistoryScope tracks GetWorkflowExecutionHistory calls made by service to persistence layer @@ -1339,6 +1341,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ PersistenceUpdateTaskListScope: {operation: "UpdateTaskList"}, PersistenceListTaskListScope: {operation: "ListTaskList"}, PersistenceDeleteTaskListScope: {operation: "DeleteTaskList"}, + PersistenceGetTaskListSizeScope: {operation: "GetTaskListSize"}, PersistenceAppendHistoryEventsScope: {operation: "AppendHistoryEvents"}, PersistenceGetWorkflowExecutionHistoryScope: {operation: "GetWorkflowExecutionHistory"}, PersistenceDeleteWorkflowExecutionHistoryScope: {operation: "DeleteWorkflowExecutionHistory"}, diff --git a/common/mocks/TaskManager.go b/common/mocks/TaskManager.go index 745336801f9..2a49b3446d7 100644 --- a/common/mocks/TaskManager.go +++ b/common/mocks/TaskManager.go @@ -1,4 +1,6 @@ -// Copyright (c) 2017-2020 Uber Technologies Inc. +// Modifications Copyright (c) 2020 Uber Technologies Inc. + +// Copyright (c) 2020 Temporal Technologies, Inc. // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -6,10 +8,10 @@ // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: - +// // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. - +// // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -18,7 +20,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -// Code generated by mockery 2.7.4. DO NOT EDIT. +// Code generated by mockery v2.32.0. DO NOT EDIT. package mocks @@ -59,6 +61,10 @@ func (_m *TaskManager) CompleteTasksLessThan(ctx context.Context, request *persi ret := _m.Called(ctx, request) var r0 *persistence.CompleteTasksLessThanResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *persistence.CompleteTasksLessThanRequest) (*persistence.CompleteTasksLessThanResponse, error)); ok { + return rf(ctx, request) + } if rf, ok := ret.Get(0).(func(context.Context, *persistence.CompleteTasksLessThanRequest) *persistence.CompleteTasksLessThanResponse); ok { r0 = rf(ctx, request) } else { @@ -67,7 +73,6 @@ func (_m *TaskManager) CompleteTasksLessThan(ctx context.Context, request *persi } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *persistence.CompleteTasksLessThanRequest) error); ok { r1 = rf(ctx, request) } else { @@ -82,6 +87,10 @@ func (_m *TaskManager) CreateTasks(ctx context.Context, request *persistence.Cre ret := _m.Called(ctx, request) var r0 *persistence.CreateTasksResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *persistence.CreateTasksRequest) (*persistence.CreateTasksResponse, error)); ok { + return rf(ctx, request) + } if rf, ok := ret.Get(0).(func(context.Context, *persistence.CreateTasksRequest) *persistence.CreateTasksResponse); ok { r0 = rf(ctx, request) } else { @@ -90,7 +99,6 @@ func (_m *TaskManager) CreateTasks(ctx context.Context, request *persistence.Cre } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *persistence.CreateTasksRequest) error); ok { r1 = rf(ctx, request) } else { @@ -133,6 +141,10 @@ func (_m *TaskManager) GetOrphanTasks(ctx context.Context, request *persistence. ret := _m.Called(ctx, request) var r0 *persistence.GetOrphanTasksResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *persistence.GetOrphanTasksRequest) (*persistence.GetOrphanTasksResponse, error)); ok { + return rf(ctx, request) + } if rf, ok := ret.Get(0).(func(context.Context, *persistence.GetOrphanTasksRequest) *persistence.GetOrphanTasksResponse); ok { r0 = rf(ctx, request) } else { @@ -141,7 +153,6 @@ func (_m *TaskManager) GetOrphanTasks(ctx context.Context, request *persistence. } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *persistence.GetOrphanTasksRequest) error); ok { r1 = rf(ctx, request) } else { @@ -151,11 +162,41 @@ func (_m *TaskManager) GetOrphanTasks(ctx context.Context, request *persistence. return r0, r1 } +// GetTaskListSize provides a mock function with given fields: ctx, request +func (_m *TaskManager) GetTaskListSize(ctx context.Context, request *persistence.GetTaskListSizeRequest) (*persistence.GetTaskListSizeResponse, error) { + ret := _m.Called(ctx, request) + + var r0 *persistence.GetTaskListSizeResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *persistence.GetTaskListSizeRequest) (*persistence.GetTaskListSizeResponse, error)); ok { + return rf(ctx, request) + } + if rf, ok := ret.Get(0).(func(context.Context, *persistence.GetTaskListSizeRequest) *persistence.GetTaskListSizeResponse); ok { + r0 = rf(ctx, request) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*persistence.GetTaskListSizeResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *persistence.GetTaskListSizeRequest) error); ok { + r1 = rf(ctx, request) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetTasks provides a mock function with given fields: ctx, request func (_m *TaskManager) GetTasks(ctx context.Context, request *persistence.GetTasksRequest) (*persistence.GetTasksResponse, error) { ret := _m.Called(ctx, request) var r0 *persistence.GetTasksResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *persistence.GetTasksRequest) (*persistence.GetTasksResponse, error)); ok { + return rf(ctx, request) + } if rf, ok := ret.Get(0).(func(context.Context, *persistence.GetTasksRequest) *persistence.GetTasksResponse); ok { r0 = rf(ctx, request) } else { @@ -164,7 +205,6 @@ func (_m *TaskManager) GetTasks(ctx context.Context, request *persistence.GetTas } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *persistence.GetTasksRequest) error); ok { r1 = rf(ctx, request) } else { @@ -179,6 +219,10 @@ func (_m *TaskManager) LeaseTaskList(ctx context.Context, request *persistence.L ret := _m.Called(ctx, request) var r0 *persistence.LeaseTaskListResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *persistence.LeaseTaskListRequest) (*persistence.LeaseTaskListResponse, error)); ok { + return rf(ctx, request) + } if rf, ok := ret.Get(0).(func(context.Context, *persistence.LeaseTaskListRequest) *persistence.LeaseTaskListResponse); ok { r0 = rf(ctx, request) } else { @@ -187,7 +231,6 @@ func (_m *TaskManager) LeaseTaskList(ctx context.Context, request *persistence.L } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *persistence.LeaseTaskListRequest) error); ok { r1 = rf(ctx, request) } else { @@ -202,6 +245,10 @@ func (_m *TaskManager) ListTaskList(ctx context.Context, request *persistence.Li ret := _m.Called(ctx, request) var r0 *persistence.ListTaskListResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *persistence.ListTaskListRequest) (*persistence.ListTaskListResponse, error)); ok { + return rf(ctx, request) + } if rf, ok := ret.Get(0).(func(context.Context, *persistence.ListTaskListRequest) *persistence.ListTaskListResponse); ok { r0 = rf(ctx, request) } else { @@ -210,7 +257,6 @@ func (_m *TaskManager) ListTaskList(ctx context.Context, request *persistence.Li } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *persistence.ListTaskListRequest) error); ok { r1 = rf(ctx, request) } else { @@ -225,6 +271,10 @@ func (_m *TaskManager) UpdateTaskList(ctx context.Context, request *persistence. ret := _m.Called(ctx, request) var r0 *persistence.UpdateTaskListResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *persistence.UpdateTaskListRequest) (*persistence.UpdateTaskListResponse, error)); ok { + return rf(ctx, request) + } if rf, ok := ret.Get(0).(func(context.Context, *persistence.UpdateTaskListRequest) *persistence.UpdateTaskListResponse); ok { r0 = rf(ctx, request) } else { @@ -233,7 +283,6 @@ func (_m *TaskManager) UpdateTaskList(ctx context.Context, request *persistence. } } - var r1 error if rf, ok := ret.Get(1).(func(context.Context, *persistence.UpdateTaskListRequest) error); ok { r1 = rf(ctx, request) } else { @@ -242,3 +291,17 @@ func (_m *TaskManager) UpdateTaskList(ctx context.Context, request *persistence. return r0, r1 } + +// NewTaskManager creates a new instance of TaskManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewTaskManager(t interface { + mock.TestingT + Cleanup(func()) +}) *TaskManager { + mock := &TaskManager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/common/persistence/dataManagerInterfaces.go b/common/persistence/dataManagerInterfaces.go index fdaa301fe0d..e1699252ee1 100644 --- a/common/persistence/dataManagerInterfaces.go +++ b/common/persistence/dataManagerInterfaces.go @@ -1270,6 +1270,18 @@ type ( RangeID int64 } + GetTaskListSizeRequest struct { + DomainID string + DomainName string + TaskListName string + TaskListType int + AckLevel int64 + } + + GetTaskListSizeResponse struct { + Size int + } + // CreateTasksRequest is used to create a new task for a workflow exectution CreateTasksRequest struct { TaskListInfo *TaskListInfo @@ -1793,6 +1805,7 @@ type ( UpdateTaskList(ctx context.Context, request *UpdateTaskListRequest) (*UpdateTaskListResponse, error) ListTaskList(ctx context.Context, request *ListTaskListRequest) (*ListTaskListResponse, error) DeleteTaskList(ctx context.Context, request *DeleteTaskListRequest) error + GetTaskListSize(ctx context.Context, request *GetTaskListSizeRequest) (*GetTaskListSizeResponse, error) CreateTasks(ctx context.Context, request *CreateTasksRequest) (*CreateTasksResponse, error) GetTasks(ctx context.Context, request *GetTasksRequest) (*GetTasksResponse, error) CompleteTask(ctx context.Context, request *CompleteTaskRequest) error diff --git a/common/persistence/dataManagerInterfaces_mock.go b/common/persistence/dataManagerInterfaces_mock.go index 49009b6f2f7..fbfea7e0888 100644 --- a/common/persistence/dataManagerInterfaces_mock.go +++ b/common/persistence/dataManagerInterfaces_mock.go @@ -922,6 +922,21 @@ func (mr *MockTaskManagerMockRecorder) GetOrphanTasks(ctx, request interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOrphanTasks", reflect.TypeOf((*MockTaskManager)(nil).GetOrphanTasks), ctx, request) } +// GetTaskListSize mocks base method. +func (m *MockTaskManager) GetTaskListSize(ctx context.Context, request *GetTaskListSizeRequest) (*GetTaskListSizeResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTaskListSize", ctx, request) + ret0, _ := ret[0].(*GetTaskListSizeResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetTaskListSize indicates an expected call of GetTaskListSize. +func (mr *MockTaskManagerMockRecorder) GetTaskListSize(ctx, request interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTaskListSize", reflect.TypeOf((*MockTaskManager)(nil).GetTaskListSize), ctx, request) +} + // GetTasks mocks base method. func (m *MockTaskManager) GetTasks(ctx context.Context, request *GetTasksRequest) (*GetTasksResponse, error) { m.ctrl.T.Helper() diff --git a/common/persistence/persistenceErrorInjectionClients.go b/common/persistence/persistenceErrorInjectionClients.go index 73e9db0a63d..dd18ac1f78a 100644 --- a/common/persistence/persistenceErrorInjectionClients.go +++ b/common/persistence/persistenceErrorInjectionClients.go @@ -1206,6 +1206,31 @@ func (p *taskErrorInjectionPersistenceClient) DeleteTaskList( return persistenceErr } +func (p *taskErrorInjectionPersistenceClient) GetTaskListSize( + ctx context.Context, + request *GetTaskListSizeRequest, +) (*GetTaskListSizeResponse, error) { + fakeErr := generateFakeError(p.errorRate) + + var resp *GetTaskListSizeResponse + var persistenceErr error + var forwardCall bool + if forwardCall = shouldForwardCallToPersistence(fakeErr); forwardCall { + resp, persistenceErr = p.persistence.GetTaskListSize(ctx, request) + } + + if fakeErr != nil { + p.logger.Error(msgInjectedFakeErr, + tag.StoreOperationGetTaskListSize, + tag.Error(fakeErr), + tag.Bool(forwardCall), + tag.StoreError(persistenceErr), + ) + return nil, fakeErr + } + return resp, persistenceErr +} + func (p *taskErrorInjectionPersistenceClient) Close() { p.persistence.Close() } diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index 9989071d38c..ee1bd3dd048 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -1029,6 +1029,23 @@ func (p *taskPersistenceClient) DeleteTaskList( return p.call(metrics.PersistenceDeleteTaskListScope, op, metrics.DomainTag(request.DomainName)) } +func (p *taskPersistenceClient) GetTaskListSize( + ctx context.Context, + request *GetTaskListSizeRequest, +) (*GetTaskListSizeResponse, error) { + var resp *GetTaskListSizeResponse + op := func() error { + var err error + resp, err = p.persistence.GetTaskListSize(ctx, request) + return err + } + err := p.call(metrics.PersistenceGetTaskListSizeScope, op) + if err != nil { + return nil, err + } + return resp, nil +} + func (p *taskPersistenceClient) UpdateTaskList( ctx context.Context, request *UpdateTaskListRequest, diff --git a/common/persistence/persistenceRateLimitedClients.go b/common/persistence/persistenceRateLimitedClients.go index 090982022ec..ccfe0e62a5b 100644 --- a/common/persistence/persistenceRateLimitedClients.go +++ b/common/persistence/persistenceRateLimitedClients.go @@ -680,6 +680,16 @@ func (p *taskRateLimitedPersistenceClient) DeleteTaskList( return p.persistence.DeleteTaskList(ctx, request) } +func (p *taskRateLimitedPersistenceClient) GetTaskListSize( + ctx context.Context, + request *GetTaskListSizeRequest, +) (*GetTaskListSizeResponse, error) { + if ok := p.rateLimiter.Allow(); !ok { + return nil, ErrPersistenceLimitExceeded + } + return p.persistence.GetTaskListSize(ctx, request) +} + func (p *taskRateLimitedPersistenceClient) Close() { p.persistence.Close() } diff --git a/common/persistence/taskManager.go b/common/persistence/taskManager.go index 104a734ae23..74082083167 100644 --- a/common/persistence/taskManager.go +++ b/common/persistence/taskManager.go @@ -26,6 +26,7 @@ import ( "context" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/types" ) type ( @@ -69,6 +70,10 @@ func (t *taskManager) DeleteTaskList(ctx context.Context, request *DeleteTaskLis return t.persistence.DeleteTaskList(ctx, request) } +func (t *taskManager) GetTaskListSize(ctx context.Context, request *GetTaskListSizeRequest) (*GetTaskListSizeResponse, error) { + return nil, &types.InternalServiceError{Message: "Not yet implemented"} +} + func (t *taskManager) CreateTasks(ctx context.Context, request *CreateTasksRequest) (*CreateTasksResponse, error) { var internalCreateTasks []*InternalCreateTasksInfo for _, task := range request.Tasks {