Skip to content

Commit

Permalink
Implement new APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Nov 14, 2024
1 parent bf46365 commit 5f5746f
Show file tree
Hide file tree
Showing 32 changed files with 2,411 additions and 552 deletions.
397 changes: 229 additions & 168 deletions .gen/proto/matching/v1/service.pb.go

Large diffs are not rendered by default.

287 changes: 144 additions & 143 deletions .gen/proto/matching/v1/service.pb.yarpc.go

Large diffs are not rendered by default.

54 changes: 49 additions & 5 deletions client/matching/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,24 @@ func (c *clientImpl) AddActivityTask(
persistence.TaskListTypeActivity,
request.GetForwardedFrom(),
)
originalTaskListName := request.TaskList.GetName()
request.TaskList.Name = partition
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
if err != nil {
return nil, err
}
return c.client.AddActivityTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
resp, err := c.client.AddActivityTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
if err != nil {
return nil, err
}
request.TaskList.Name = originalTaskListName
c.provider.UpdatePartitionConfig(
request.GetDomainUUID(),
*request.TaskList,
persistence.TaskListTypeActivity,
resp.PartitionConfig,
)
return resp, nil
}

func (c *clientImpl) AddDecisionTask(
Expand All @@ -84,12 +96,24 @@ func (c *clientImpl) AddDecisionTask(
persistence.TaskListTypeDecision,
request.GetForwardedFrom(),
)
originalTaskListName := request.TaskList.GetName()
request.TaskList.Name = partition
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
if err != nil {
return nil, err
}
return c.client.AddDecisionTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
resp, err := c.client.AddDecisionTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
if err != nil {
return nil, err
}
request.TaskList.Name = originalTaskListName
c.provider.UpdatePartitionConfig(
request.GetDomainUUID(),
*request.TaskList,
persistence.TaskListTypeDecision,
resp.PartitionConfig,
)
return resp, nil
}

func (c *clientImpl) PollForActivityTask(
Expand All @@ -103,13 +127,25 @@ func (c *clientImpl) PollForActivityTask(
persistence.TaskListTypeActivity,
request.GetForwardedFrom(),
)
originalTaskListName := request.PollRequest.GetTaskList().GetName()
request.PollRequest.TaskList.Name = partition
peer, err := c.peerResolver.FromTaskList(request.PollRequest.TaskList.GetName())
if err != nil {
return nil, err
}
// TODO: update activity response to include backlog count hint and update the weight for partitions
return c.client.PollForActivityTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
resp, err := c.client.PollForActivityTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
if err != nil {
return nil, err
}
request.PollRequest.TaskList.Name = originalTaskListName
c.provider.UpdatePartitionConfig(
request.GetDomainUUID(),
*request.PollRequest.GetTaskList(),
persistence.TaskListTypeActivity,
resp.PartitionConfig,
)
return resp, nil
}

func (c *clientImpl) PollForDecisionTask(
Expand Down Expand Up @@ -269,13 +305,21 @@ func (c *clientImpl) UpdateTaskListPartitionConfig(
request *types.MatchingUpdateTaskListPartitionConfigRequest,
opts ...yarpc.CallOption,
) (*types.MatchingUpdateTaskListPartitionConfigResponse, error) {
return nil, &types.BadRequestError{}
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
if err != nil {
return nil, err
}
return c.client.UpdateTaskListPartitionConfig(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
}

func (c *clientImpl) RefreshTaskListPartitionConfig(
ctx context.Context,
request *types.MatchingRefreshTaskListPartitionConfigRequest,
opts ...yarpc.CallOption,
) (*types.MatchingRefreshTaskListPartitionConfigResponse, error) {
return nil, &types.BadRequestError{}
peer, err := c.peerResolver.FromTaskList(request.TaskList.GetName())
if err != nil {
return nil, err
}
return c.client.RefreshTaskListPartitionConfig(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
}
95 changes: 95 additions & 0 deletions client/matching/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func TestClient_withResponse(t *testing.T) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.AddActivityTaskResponse{}, nil)
mp.EXPECT().UpdatePartitionConfig(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, nil)
},
want: &types.AddActivityTaskResponse{},
},
Expand Down Expand Up @@ -198,6 +199,7 @@ func TestClient_withResponse(t *testing.T) {
balancer.EXPECT().PickWritePartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.AddDecisionTaskResponse{}, nil)
mp.EXPECT().UpdatePartitionConfig(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision, nil)
},
want: &types.AddDecisionTaskResponse{},
},
Expand Down Expand Up @@ -233,6 +235,7 @@ func TestClient_withResponse(t *testing.T) {
balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.MatchingPollForActivityTaskResponse{}, nil)
mp.EXPECT().UpdatePartitionConfig(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, nil)
},
want: &types.MatchingPollForActivityTaskResponse{},
},
Expand Down Expand Up @@ -431,6 +434,74 @@ func TestClient_withResponse(t *testing.T) {
want: nil,
wantError: true,
},
{
name: "UpdateTaskListPartitionConfig",
op: func(c Client) (any, error) {
return c.UpdateTaskListPartitionConfig(context.Background(), testMatchingUpdateTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", nil)
c.EXPECT().UpdateTaskListPartitionConfig(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.MatchingUpdateTaskListPartitionConfigResponse{}, nil)
},
want: &types.MatchingUpdateTaskListPartitionConfigResponse{},
},
{
name: "UpdateTaskListPartitionConfig - Error in resolving peer",
op: func(c Client) (any, error) {
return c.UpdateTaskListPartitionConfig(context.Background(), testMatchingUpdateTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", assert.AnError)
},
want: nil,
wantError: true,
},
{
name: "UpdateTaskListPartitionConfig - Error while listing tasklist partitions",
op: func(c Client) (any, error) {
return c.UpdateTaskListPartitionConfig(context.Background(), testMatchingUpdateTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", nil)
c.EXPECT().UpdateTaskListPartitionConfig(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
},
want: nil,
wantError: true,
},
{
name: "RefreshTaskListPartitionConfig",
op: func(c Client) (any, error) {
return c.RefreshTaskListPartitionConfig(context.Background(), testMatchingRefreshTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", nil)
c.EXPECT().RefreshTaskListPartitionConfig(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.MatchingRefreshTaskListPartitionConfigResponse{}, nil)
},
want: &types.MatchingRefreshTaskListPartitionConfigResponse{},
},
{
name: "RefreshTaskListPartitionConfig - Error in resolving peer",
op: func(c Client) (any, error) {
return c.RefreshTaskListPartitionConfig(context.Background(), testMatchingRefreshTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", assert.AnError)
},
want: nil,
wantError: true,
},
{
name: "RefreshTaskListPartitionConfig - Error while listing tasklist partitions",
op: func(c Client) (any, error) {
return c.RefreshTaskListPartitionConfig(context.Background(), testMatchingRefreshTaskListPartitionConfigRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
p.EXPECT().FromTaskList(_testTaskList).Return("peer0", nil)
c.EXPECT().RefreshTaskListPartitionConfig(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, assert.AnError)
},
want: nil,
wantError: true,
},
}
for _, tt := range tests {
tt := tt
Expand Down Expand Up @@ -526,3 +597,27 @@ func testGetTaskListsByDomainRequest() *types.GetTaskListsByDomainRequest {
Domain: _testDomain,
}
}

func testMatchingUpdateTaskListPartitionConfigRequest() *types.MatchingUpdateTaskListPartitionConfigRequest {
return &types.MatchingUpdateTaskListPartitionConfigRequest{
DomainUUID: _testDomainUUID,
TaskList: &types.TaskList{Name: _testTaskList},
PartitionConfig: &types.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 3,
NumWritePartitions: 2,
},
}
}

func testMatchingRefreshTaskListPartitionConfigRequest() *types.MatchingRefreshTaskListPartitionConfigRequest {
return &types.MatchingRefreshTaskListPartitionConfigRequest{
DomainUUID: _testDomainUUID,
TaskList: &types.TaskList{Name: _testTaskList},
PartitionConfig: &types.TaskListPartitionConfig{
Version: 1,
NumReadPartitions: 3,
NumWritePartitions: 2,
},
}
}
48 changes: 33 additions & 15 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,8 @@ const (
GetDomainAsyncWorkflowConfiguraton
// UpdateDomainAsyncWorkflowConfiguraton is the scope for updating domain async workflow configuration
UpdateDomainAsyncWorkflowConfiguraton
// UpdateTaskListPartitionConfig is the scope for update task list partition config
UpdateTaskListPartitionConfig

NumAdminScopes
)
Expand Down Expand Up @@ -1336,6 +1338,10 @@ const (
MatchingListTaskListPartitionsScope
// MatchingGetTaskListsByDomainScope tracks GetTaskListsByDomain API calls received by service
MatchingGetTaskListsByDomainScope
// MatchingUpdateTaskListPartitionConfigScope tracks UpdateTaskListPartitionConfig API calls received by service
MatchingUpdateTaskListPartitionConfigScope
// MatchingRefreshTaskListPartitionConfigScope tracks RefreshTaskListPartitionConfig API calls received by service
MatchingRefreshTaskListPartitionConfigScope

NumMatchingScopes
)
Expand Down Expand Up @@ -1795,6 +1801,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
UpdateDomainIsolationGroups: {operation: "UpdateDomainIsolationGroups"},
GetDomainAsyncWorkflowConfiguraton: {operation: "GetDomainAsyncWorkflowConfiguraton"},
UpdateDomainAsyncWorkflowConfiguraton: {operation: "UpdateDomainAsyncWorkflowConfiguraton"},
UpdateTaskListPartitionConfig: {operation: "UpdateTaskListPartitionConfig"},

FrontendRestartWorkflowExecutionScope: {operation: "RestartWorkflowExecution"},
FrontendStartWorkflowExecutionScope: {operation: "StartWorkflowExecution"},
Expand Down Expand Up @@ -1984,18 +1991,20 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
},
// Matching Scope Names
Matching: {
MatchingPollForDecisionTaskScope: {operation: "PollForDecisionTask"},
MatchingPollForActivityTaskScope: {operation: "PollForActivityTask"},
MatchingAddActivityTaskScope: {operation: "AddActivityTask"},
MatchingAddDecisionTaskScope: {operation: "AddDecisionTask"},
MatchingAddTaskScope: {operation: "AddTask"},
MatchingTaskListMgrScope: {operation: "TaskListMgr"},
MatchingQueryWorkflowScope: {operation: "QueryWorkflow"},
MatchingRespondQueryTaskCompletedScope: {operation: "RespondQueryTaskCompleted"},
MatchingCancelOutstandingPollScope: {operation: "CancelOutstandingPoll"},
MatchingDescribeTaskListScope: {operation: "DescribeTaskList"},
MatchingListTaskListPartitionsScope: {operation: "ListTaskListPartitions"},
MatchingGetTaskListsByDomainScope: {operation: "GetTaskListsByDomain"},
MatchingPollForDecisionTaskScope: {operation: "PollForDecisionTask"},
MatchingPollForActivityTaskScope: {operation: "PollForActivityTask"},
MatchingAddActivityTaskScope: {operation: "AddActivityTask"},
MatchingAddDecisionTaskScope: {operation: "AddDecisionTask"},
MatchingAddTaskScope: {operation: "AddTask"},
MatchingTaskListMgrScope: {operation: "TaskListMgr"},
MatchingQueryWorkflowScope: {operation: "QueryWorkflow"},
MatchingRespondQueryTaskCompletedScope: {operation: "RespondQueryTaskCompleted"},
MatchingCancelOutstandingPollScope: {operation: "CancelOutstandingPoll"},
MatchingDescribeTaskListScope: {operation: "DescribeTaskList"},
MatchingListTaskListPartitionsScope: {operation: "ListTaskListPartitions"},
MatchingGetTaskListsByDomainScope: {operation: "GetTaskListsByDomain"},
MatchingUpdateTaskListPartitionConfigScope: {operation: "UpdateTaskListPartitionConfig"},
MatchingRefreshTaskListPartitionConfigScope: {operation: "RefreshTaskListPartitionConfig"},
},
// Worker Scope Names
Worker: {
Expand Down Expand Up @@ -2263,6 +2272,10 @@ const (
P2PPeersCount
P2PPeerAdded
P2PPeerRemoved
// task list partition config metrics
TaskListPartitionConfigVersionGauge
TaskListPartitionConfigNumReadGauge
TaskListPartitionConfigNumWriteGauge

NumCommonMetrics // Needs to be last on this list for iota numbering
)
Expand Down Expand Up @@ -2588,6 +2601,7 @@ const (
IsolationTaskMatchPerTaskListCounter
PollerPerTaskListCounter
PollerInvalidIsolationGroupCounter
TaskListPartitionUpdateFailedCounter
TaskListManagersGauge
TaskLagPerTaskListGauge
TaskBacklogPerTaskListGauge
Expand Down Expand Up @@ -2958,9 +2972,12 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
GlobalRatelimiterRemovedLimits: {metricName: "global_ratelimiter_removed_limits", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},
GlobalRatelimiterRemovedHostLimits: {metricName: "global_ratelimiter_removed_host_limits", metricType: Histogram, buckets: GlobalRatelimiterUsageHistogram},

P2PPeersCount: {metricName: "peers_count", metricType: Gauge},
P2PPeerAdded: {metricName: "peer_added", metricType: Counter},
P2PPeerRemoved: {metricName: "peer_removed", metricType: Counter},
P2PPeersCount: {metricName: "peers_count", metricType: Gauge},
P2PPeerAdded: {metricName: "peer_added", metricType: Counter},
P2PPeerRemoved: {metricName: "peer_removed", metricType: Counter},
TaskListPartitionConfigVersionGauge: {metricName: "task_list_partition_config_version", metricType: Gauge},
TaskListPartitionConfigNumReadGauge: {metricName: "task_list_partition_config_num_read", metricType: Gauge},
TaskListPartitionConfigNumWriteGauge: {metricName: "task_list_partition_config_num_write", metricType: Gauge},
},
History: {
TaskRequests: {metricName: "task_requests", metricType: Counter},
Expand Down Expand Up @@ -3274,6 +3291,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
IsolationTaskMatchPerTaskListCounter: {metricName: "isolation_task_matches_per_tl", metricType: Counter},
PollerPerTaskListCounter: {metricName: "poller_count_per_tl", metricRollupName: "poller_count"},
PollerInvalidIsolationGroupCounter: {metricName: "poller_invalid_isolation_group_per_tl", metricType: Counter},
TaskListPartitionUpdateFailedCounter: {metricName: "tasklist_partition_update_failed_per_tl", metricType: Counter},
TaskListManagersGauge: {metricName: "tasklist_managers", metricType: Gauge},
TaskLagPerTaskListGauge: {metricName: "task_lag_per_tl", metricType: Gauge},
TaskBacklogPerTaskListGauge: {metricName: "task_backlog_per_tl", metricType: Gauge},
Expand Down
9 changes: 9 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
activeCluster = "active_cluster"
taskList = "tasklist"
taskListType = "tasklistType"
taskListRootPartition = "tasklist_root_partition"
workflowType = "workflowType"
activityType = "activityType"
decisionType = "decisionType"
Expand Down Expand Up @@ -175,6 +176,14 @@ func TaskListTypeTag(value string) Tag {
return metricWithUnknown(taskListType, value)
}

// TaskListRootPartition returns a new task list root partition tag.
func TaskListRootPartitionTag(value string) Tag {
if len(value) == 0 {
value = unknownValue
}
return simpleMetric{key: taskListRootPartition, value: sanitizer.Value(value)}
}

// WorkflowTypeTag returns a new workflow type tag.
func WorkflowTypeTag(value string) Tag {
return metricWithUnknown(workflowType, value)
Expand Down
Loading

0 comments on commit 5f5746f

Please sign in to comment.