diff --git a/.gen/go/matching/idl.go b/.gen/go/matching/idl.go index 650983499c1..9d816fb42bc 100644 --- a/.gen/go/matching/idl.go +++ b/.gen/go/matching/idl.go @@ -33,11 +33,11 @@ var ThriftModule = &thriftreflect.ThriftModule{ Name: "matching", Package: "github.com/uber/cadence/.gen/go/matching", FilePath: "matching.thrift", - SHA1: "a31a01261c9ca179cf337f936592c103ebfc2131", + SHA1: "f322ae5922feb648ea835f92ab09f3b3c3b2d145", Includes: []*thriftreflect.ThriftModule{ shared.ThriftModule, }, Raw: rawIDL, } -const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\ninclude \"shared.thrift\"\n\nnamespace java com.uber.cadence.matching\n\nstruct PollForDecisionTaskRequest {\n 10: optional string domainUUID\n 15: optional string pollerID\n 20: optional shared.PollForDecisionTaskRequest pollRequest\n}\n\nstruct PollForDecisionTaskResponse {\n 10: optional binary taskToken\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional shared.WorkflowType workflowType\n 40: optional i64 (js.type = \"Long\") previousStartedEventId\n 50: optional i64 (js.type = \"Long\") startedEventId\n 51: optional i64 (js.type = \"Long\") attempt\n 60: optional i64 (js.type = \"Long\") nextEventId\n 65: optional i64 (js.type = \"Long\") backlogCountHint\n 70: optional bool stickyExecutionEnabled\n 80: optional shared.WorkflowQuery query\n 90: optional shared.TransientDecisionInfo decisionInfo\n}\n\nstruct PollForActivityTaskRequest {\n 10: optional string domainUUID\n 15: optional string pollerID\n 20: optional shared.PollForActivityTaskRequest pollRequest\n}\n\nstruct AddDecisionTaskRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n 30: optional shared.TaskList taskList\n 40: optional i64 (js.type = \"Long\") scheduleId\n 50: optional i32 scheduleToStartTimeoutSeconds\n}\n\nstruct AddActivityTaskRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n 30: optional string sourceDomainUUID\n 40: optional shared.TaskList taskList\n 50: optional i64 (js.type = \"Long\") scheduleId\n 60: optional i32 scheduleToStartTimeoutSeconds\n}\n\nstruct QueryWorkflowRequest {\n 10: optional string domainUUID\n 20: optional shared.TaskList taskList\n 30: optional shared.QueryWorkflowRequest queryRequest\n}\n\nstruct RespondQueryTaskCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.TaskList taskList\n 30: optional string taskID\n 40: optional shared.RespondQueryTaskCompletedRequest completedRequest\n}\n\nstruct CancelOutstandingPollRequest {\n 10: optional string domainUUID\n 20: optional i32 taskListType\n 30: optional shared.TaskList taskList\n 40: optional string pollerID\n}\n\nstruct DescribeTaskListRequest {\n 10: optional string domainUUID\n 20: optional shared.DescribeTaskListRequest descRequest\n}\n\n/**\n* MatchingService API is exposed to provide support for polling from long running applications.\n* Such applications are expected to have a worker which regularly polls for DecisionTask and ActivityTask. For each\n* DecisionTask, application is expected to process the history of events for that session and respond back with next\n* decisions. For each ActivityTask, application is expected to execute the actual logic for that task and respond back\n* with completion or failure.\n**/\nservice MatchingService {\n /**\n * PollForDecisionTask is called by frontend to process DecisionTask from a specific taskList. A\n * DecisionTask is dispatched to callers for active workflow executions, with pending decisions.\n **/\n PollForDecisionTaskResponse PollForDecisionTask(1: PollForDecisionTaskRequest pollRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * PollForActivityTask is called by frontend to process ActivityTask from a specific taskList. ActivityTask\n * is dispatched to callers whenever a ScheduleTask decision is made for a workflow execution.\n **/\n shared.PollForActivityTaskResponse PollForActivityTask(1: PollForActivityTaskRequest pollRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * AddDecisionTask is called by the history service when a decision task is scheduled, so that it can be dispatched\n * by the MatchingEngine.\n **/\n void AddDecisionTask(1: AddDecisionTaskRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.ServiceBusyError serviceBusyError,\n 4: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * AddActivityTask is called by the history service when a decision task is scheduled, so that it can be dispatched\n * by the MatchingEngine.\n **/\n void AddActivityTask(1: AddActivityTaskRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.ServiceBusyError serviceBusyError,\n 4: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * QueryWorkflow is called by frontend to query a workflow.\n **/\n shared.QueryWorkflowResponse QueryWorkflow(1: QueryWorkflowRequest queryRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.QueryFailedError queryFailedError,\n 5: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * RespondQueryTaskCompleted is called by frontend to respond query completed.\n **/\n void RespondQueryTaskCompleted(1: RespondQueryTaskCompletedRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * CancelOutstandingPoll is called by frontend to unblock long polls on matching for zombie pollers.\n * Our rpc stack does not support context propagation, so when a client connection goes away frontend sees\n * cancellation of context for that handler, but any corresponding calls (long-poll) to matching service does not\n * see the cancellation propagated so it can unblock corresponding long-polls on its end. This results is tasks\n * being dispatched to zombie pollers in this situation. This API is added so everytime frontend makes a long-poll\n * api call to matching it passes in a pollerID and then calls this API when it detects client connection is closed\n * to unblock long polls for this poller and prevent tasks being sent to these zombie pollers.\n **/\n void CancelOutstandingPoll(1: CancelOutstandingPollRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n )\n\n /**\n * DescribeTaskList returns information about the target tasklist, right now this API returns the\n * pollers which polled this tasklist in last few minutes.\n **/\n shared.DescribeTaskListResponse DescribeTaskList(1: DescribeTaskListRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n )\n}\n" +const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\ninclude \"shared.thrift\"\n\nnamespace java com.uber.cadence.matching\n\nstruct PollForDecisionTaskRequest {\n 10: optional string domainUUID\n 15: optional string pollerID\n 20: optional shared.PollForDecisionTaskRequest pollRequest\n}\n\nstruct PollForDecisionTaskResponse {\n 10: optional binary taskToken\n 20: optional shared.WorkflowExecution workflowExecution\n 30: optional shared.WorkflowType workflowType\n 40: optional i64 (js.type = \"Long\") previousStartedEventId\n 50: optional i64 (js.type = \"Long\") startedEventId\n 51: optional i64 (js.type = \"Long\") attempt\n 60: optional i64 (js.type = \"Long\") nextEventId\n 65: optional i64 (js.type = \"Long\") backlogCountHint\n 70: optional bool stickyExecutionEnabled\n 80: optional shared.WorkflowQuery query\n 90: optional shared.TransientDecisionInfo decisionInfo\n}\n\nstruct PollForActivityTaskRequest {\n 10: optional string domainUUID\n 15: optional string pollerID\n 20: optional shared.PollForActivityTaskRequest pollRequest\n}\n\nstruct AddDecisionTaskRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n 30: optional shared.TaskList taskList\n 40: optional i64 (js.type = \"Long\") scheduleId\n 50: optional i32 scheduleToStartTimeoutSeconds\n}\n\nstruct AddActivityTaskRequest {\n 10: optional string domainUUID\n 20: optional shared.WorkflowExecution execution\n 30: optional string sourceDomainUUID\n 40: optional shared.TaskList taskList\n 50: optional i64 (js.type = \"Long\") scheduleId\n 60: optional i32 scheduleToStartTimeoutSeconds\n}\n\nstruct QueryWorkflowRequest {\n 10: optional string domainUUID\n 20: optional shared.TaskList taskList\n 30: optional shared.QueryWorkflowRequest queryRequest\n}\n\nstruct RespondQueryTaskCompletedRequest {\n 10: optional string domainUUID\n 20: optional shared.TaskList taskList\n 30: optional string taskID\n 40: optional shared.RespondQueryTaskCompletedRequest completedRequest\n}\n\nstruct CancelOutstandingPollRequest {\n 10: optional string domainUUID\n 20: optional i32 taskListType\n 30: optional shared.TaskList taskList\n 40: optional string pollerID\n}\n\nstruct DescribeTaskListRequest {\n 10: optional string domainUUID\n 20: optional shared.DescribeTaskListRequest descRequest\n}\n\n/**\n* MatchingService API is exposed to provide support for polling from long running applications.\n* Such applications are expected to have a worker which regularly polls for DecisionTask and ActivityTask. For each\n* DecisionTask, application is expected to process the history of events for that session and respond back with next\n* decisions. For each ActivityTask, application is expected to execute the actual logic for that task and respond back\n* with completion or failure.\n**/\nservice MatchingService {\n /**\n * PollForDecisionTask is called by frontend to process DecisionTask from a specific taskList. A\n * DecisionTask is dispatched to callers for active workflow executions, with pending decisions.\n **/\n PollForDecisionTaskResponse PollForDecisionTask(1: PollForDecisionTaskRequest pollRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.LimitExceededError limitExceededError,\n 4: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * PollForActivityTask is called by frontend to process ActivityTask from a specific taskList. ActivityTask\n * is dispatched to callers whenever a ScheduleTask decision is made for a workflow execution.\n **/\n shared.PollForActivityTaskResponse PollForActivityTask(1: PollForActivityTaskRequest pollRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.LimitExceededError limitExceededError,\n 4: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * AddDecisionTask is called by the history service when a decision task is scheduled, so that it can be dispatched\n * by the MatchingEngine.\n **/\n void AddDecisionTask(1: AddDecisionTaskRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.ServiceBusyError serviceBusyError,\n 4: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * AddActivityTask is called by the history service when a decision task is scheduled, so that it can be dispatched\n * by the MatchingEngine.\n **/\n void AddActivityTask(1: AddActivityTaskRequest addRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.ServiceBusyError serviceBusyError,\n 4: shared.LimitExceededError limitExceededError,\n )\n\n /**\n * QueryWorkflow is called by frontend to query a workflow.\n **/\n shared.QueryWorkflowResponse QueryWorkflow(1: QueryWorkflowRequest queryRequest)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.QueryFailedError queryFailedError,\n 5: shared.LimitExceededError limitExceededError,\n 6: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * RespondQueryTaskCompleted is called by frontend to respond query completed.\n **/\n void RespondQueryTaskCompleted(1: RespondQueryTaskCompletedRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.LimitExceededError limitExceededError,\n 5: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * CancelOutstandingPoll is called by frontend to unblock long polls on matching for zombie pollers.\n * Our rpc stack does not support context propagation, so when a client connection goes away frontend sees\n * cancellation of context for that handler, but any corresponding calls (long-poll) to matching service does not\n * see the cancellation propagated so it can unblock corresponding long-polls on its end. This results is tasks\n * being dispatched to zombie pollers in this situation. This API is added so everytime frontend makes a long-poll\n * api call to matching it passes in a pollerID and then calls this API when it detects client connection is closed\n * to unblock long polls for this poller and prevent tasks being sent to these zombie pollers.\n **/\n void CancelOutstandingPoll(1: CancelOutstandingPollRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.ServiceBusyError serviceBusyError,\n )\n\n /**\n * DescribeTaskList returns information about the target tasklist, right now this API returns the\n * pollers which polled this tasklist in last few minutes.\n **/\n shared.DescribeTaskListResponse DescribeTaskList(1: DescribeTaskListRequest request)\n throws (\n 1: shared.BadRequestError badRequestError,\n 2: shared.InternalServiceError internalServiceError,\n 3: shared.EntityNotExistsError entityNotExistError,\n 4: shared.ServiceBusyError serviceBusyError,\n )\n}\n" diff --git a/.gen/go/matching/matchingservice_canceloutstandingpoll.go b/.gen/go/matching/matchingservice_canceloutstandingpoll.go index 5973d417a06..c500347204a 100644 --- a/.gen/go/matching/matchingservice_canceloutstandingpoll.go +++ b/.gen/go/matching/matchingservice_canceloutstandingpoll.go @@ -220,6 +220,8 @@ func init() { return true case *shared.InternalServiceError: return true + case *shared.ServiceBusyError: + return true default: return false } @@ -241,6 +243,11 @@ func init() { return nil, errors.New("WrapResponse received non-nil error type with nil value for MatchingService_CancelOutstandingPoll_Result.InternalServiceError") } return &MatchingService_CancelOutstandingPoll_Result{InternalServiceError: e}, nil + case *shared.ServiceBusyError: + if e == nil { + return nil, errors.New("WrapResponse received non-nil error type with nil value for MatchingService_CancelOutstandingPoll_Result.ServiceBusyError") + } + return &MatchingService_CancelOutstandingPoll_Result{ServiceBusyError: e}, nil } return nil, err @@ -254,6 +261,10 @@ func init() { err = result.InternalServiceError return } + if result.ServiceBusyError != nil { + err = result.ServiceBusyError + return + } return } @@ -265,6 +276,7 @@ func init() { type MatchingService_CancelOutstandingPoll_Result struct { BadRequestError *shared.BadRequestError `json:"badRequestError,omitempty"` InternalServiceError *shared.InternalServiceError `json:"internalServiceError,omitempty"` + ServiceBusyError *shared.ServiceBusyError `json:"serviceBusyError,omitempty"` } // ToWire translates a MatchingService_CancelOutstandingPoll_Result struct into a Thrift-level intermediate @@ -284,7 +296,7 @@ type MatchingService_CancelOutstandingPoll_Result struct { // } func (v *MatchingService_CancelOutstandingPoll_Result) ToWire() (wire.Value, error) { var ( - fields [2]wire.Field + fields [3]wire.Field i int = 0 w wire.Value err error @@ -306,6 +318,14 @@ func (v *MatchingService_CancelOutstandingPoll_Result) ToWire() (wire.Value, err fields[i] = wire.Field{ID: 2, Value: w} i++ } + if v.ServiceBusyError != nil { + w, err = v.ServiceBusyError.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 3, Value: w} + i++ + } if i > 1 { return wire.Value{}, fmt.Errorf("MatchingService_CancelOutstandingPoll_Result should have at most one field: got %v fields", i) @@ -351,6 +371,14 @@ func (v *MatchingService_CancelOutstandingPoll_Result) FromWire(w wire.Value) er return err } + } + case 3: + if field.Value.Type() == wire.TStruct { + v.ServiceBusyError, err = _ServiceBusyError_Read(field.Value) + if err != nil { + return err + } + } } } @@ -362,6 +390,9 @@ func (v *MatchingService_CancelOutstandingPoll_Result) FromWire(w wire.Value) er if v.InternalServiceError != nil { count++ } + if v.ServiceBusyError != nil { + count++ + } if count > 1 { return fmt.Errorf("MatchingService_CancelOutstandingPoll_Result should have at most one field: got %v fields", count) } @@ -376,7 +407,7 @@ func (v *MatchingService_CancelOutstandingPoll_Result) String() string { return "" } - var fields [2]string + var fields [3]string i := 0 if v.BadRequestError != nil { fields[i] = fmt.Sprintf("BadRequestError: %v", v.BadRequestError) @@ -386,6 +417,10 @@ func (v *MatchingService_CancelOutstandingPoll_Result) String() string { fields[i] = fmt.Sprintf("InternalServiceError: %v", v.InternalServiceError) i++ } + if v.ServiceBusyError != nil { + fields[i] = fmt.Sprintf("ServiceBusyError: %v", v.ServiceBusyError) + i++ + } return fmt.Sprintf("MatchingService_CancelOutstandingPoll_Result{%v}", strings.Join(fields[:i], ", ")) } @@ -401,6 +436,9 @@ func (v *MatchingService_CancelOutstandingPoll_Result) Equals(rhs *MatchingServi if !((v.InternalServiceError == nil && rhs.InternalServiceError == nil) || (v.InternalServiceError != nil && rhs.InternalServiceError != nil && v.InternalServiceError.Equals(rhs.InternalServiceError))) { return false } + if !((v.ServiceBusyError == nil && rhs.ServiceBusyError == nil) || (v.ServiceBusyError != nil && rhs.ServiceBusyError != nil && v.ServiceBusyError.Equals(rhs.ServiceBusyError))) { + return false + } return true } diff --git a/.gen/go/matching/matchingservice_describetasklist.go b/.gen/go/matching/matchingservice_describetasklist.go index 6a4515d346f..24a19e43cee 100644 --- a/.gen/go/matching/matchingservice_describetasklist.go +++ b/.gen/go/matching/matchingservice_describetasklist.go @@ -221,6 +221,8 @@ func init() { return true case *shared.EntityNotExistsError: return true + case *shared.ServiceBusyError: + return true default: return false } @@ -247,6 +249,11 @@ func init() { return nil, errors.New("WrapResponse received non-nil error type with nil value for MatchingService_DescribeTaskList_Result.EntityNotExistError") } return &MatchingService_DescribeTaskList_Result{EntityNotExistError: e}, nil + case *shared.ServiceBusyError: + if e == nil { + return nil, errors.New("WrapResponse received non-nil error type with nil value for MatchingService_DescribeTaskList_Result.ServiceBusyError") + } + return &MatchingService_DescribeTaskList_Result{ServiceBusyError: e}, nil } return nil, err @@ -264,6 +271,10 @@ func init() { err = result.EntityNotExistError return } + if result.ServiceBusyError != nil { + err = result.ServiceBusyError + return + } if result.Success != nil { success = result.Success @@ -287,6 +298,7 @@ type MatchingService_DescribeTaskList_Result struct { BadRequestError *shared.BadRequestError `json:"badRequestError,omitempty"` InternalServiceError *shared.InternalServiceError `json:"internalServiceError,omitempty"` EntityNotExistError *shared.EntityNotExistsError `json:"entityNotExistError,omitempty"` + ServiceBusyError *shared.ServiceBusyError `json:"serviceBusyError,omitempty"` } // ToWire translates a MatchingService_DescribeTaskList_Result struct into a Thrift-level intermediate @@ -306,7 +318,7 @@ type MatchingService_DescribeTaskList_Result struct { // } func (v *MatchingService_DescribeTaskList_Result) ToWire() (wire.Value, error) { var ( - fields [4]wire.Field + fields [5]wire.Field i int = 0 w wire.Value err error @@ -344,6 +356,14 @@ func (v *MatchingService_DescribeTaskList_Result) ToWire() (wire.Value, error) { fields[i] = wire.Field{ID: 3, Value: w} i++ } + if v.ServiceBusyError != nil { + w, err = v.ServiceBusyError.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 4, Value: w} + i++ + } if i != 1 { return wire.Value{}, fmt.Errorf("MatchingService_DescribeTaskList_Result should have exactly one field: got %v fields", i) @@ -417,6 +437,14 @@ func (v *MatchingService_DescribeTaskList_Result) FromWire(w wire.Value) error { return err } + } + case 4: + if field.Value.Type() == wire.TStruct { + v.ServiceBusyError, err = _ServiceBusyError_Read(field.Value) + if err != nil { + return err + } + } } } @@ -434,6 +462,9 @@ func (v *MatchingService_DescribeTaskList_Result) FromWire(w wire.Value) error { if v.EntityNotExistError != nil { count++ } + if v.ServiceBusyError != nil { + count++ + } if count != 1 { return fmt.Errorf("MatchingService_DescribeTaskList_Result should have exactly one field: got %v fields", count) } @@ -448,7 +479,7 @@ func (v *MatchingService_DescribeTaskList_Result) String() string { return "" } - var fields [4]string + var fields [5]string i := 0 if v.Success != nil { fields[i] = fmt.Sprintf("Success: %v", v.Success) @@ -466,6 +497,10 @@ func (v *MatchingService_DescribeTaskList_Result) String() string { fields[i] = fmt.Sprintf("EntityNotExistError: %v", v.EntityNotExistError) i++ } + if v.ServiceBusyError != nil { + fields[i] = fmt.Sprintf("ServiceBusyError: %v", v.ServiceBusyError) + i++ + } return fmt.Sprintf("MatchingService_DescribeTaskList_Result{%v}", strings.Join(fields[:i], ", ")) } @@ -487,6 +522,9 @@ func (v *MatchingService_DescribeTaskList_Result) Equals(rhs *MatchingService_De if !((v.EntityNotExistError == nil && rhs.EntityNotExistError == nil) || (v.EntityNotExistError != nil && rhs.EntityNotExistError != nil && v.EntityNotExistError.Equals(rhs.EntityNotExistError))) { return false } + if !((v.ServiceBusyError == nil && rhs.ServiceBusyError == nil) || (v.ServiceBusyError != nil && rhs.ServiceBusyError != nil && v.ServiceBusyError.Equals(rhs.ServiceBusyError))) { + return false + } return true } diff --git a/.gen/go/matching/matchingservice_pollforactivitytask.go b/.gen/go/matching/matchingservice_pollforactivitytask.go index f3b88db1b2a..792643c1e61 100644 --- a/.gen/go/matching/matchingservice_pollforactivitytask.go +++ b/.gen/go/matching/matchingservice_pollforactivitytask.go @@ -221,6 +221,8 @@ func init() { return true case *shared.LimitExceededError: return true + case *shared.ServiceBusyError: + return true default: return false } @@ -247,6 +249,11 @@ func init() { return nil, errors.New("WrapResponse received non-nil error type with nil value for MatchingService_PollForActivityTask_Result.LimitExceededError") } return &MatchingService_PollForActivityTask_Result{LimitExceededError: e}, nil + case *shared.ServiceBusyError: + if e == nil { + return nil, errors.New("WrapResponse received non-nil error type with nil value for MatchingService_PollForActivityTask_Result.ServiceBusyError") + } + return &MatchingService_PollForActivityTask_Result{ServiceBusyError: e}, nil } return nil, err @@ -264,6 +271,10 @@ func init() { err = result.LimitExceededError return } + if result.ServiceBusyError != nil { + err = result.ServiceBusyError + return + } if result.Success != nil { success = result.Success @@ -287,6 +298,7 @@ type MatchingService_PollForActivityTask_Result struct { BadRequestError *shared.BadRequestError `json:"badRequestError,omitempty"` InternalServiceError *shared.InternalServiceError `json:"internalServiceError,omitempty"` LimitExceededError *shared.LimitExceededError `json:"limitExceededError,omitempty"` + ServiceBusyError *shared.ServiceBusyError `json:"serviceBusyError,omitempty"` } // ToWire translates a MatchingService_PollForActivityTask_Result struct into a Thrift-level intermediate @@ -306,7 +318,7 @@ type MatchingService_PollForActivityTask_Result struct { // } func (v *MatchingService_PollForActivityTask_Result) ToWire() (wire.Value, error) { var ( - fields [4]wire.Field + fields [5]wire.Field i int = 0 w wire.Value err error @@ -344,6 +356,14 @@ func (v *MatchingService_PollForActivityTask_Result) ToWire() (wire.Value, error fields[i] = wire.Field{ID: 3, Value: w} i++ } + if v.ServiceBusyError != nil { + w, err = v.ServiceBusyError.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 4, Value: w} + i++ + } if i != 1 { return wire.Value{}, fmt.Errorf("MatchingService_PollForActivityTask_Result should have exactly one field: got %v fields", i) @@ -411,6 +431,14 @@ func (v *MatchingService_PollForActivityTask_Result) FromWire(w wire.Value) erro return err } + } + case 4: + if field.Value.Type() == wire.TStruct { + v.ServiceBusyError, err = _ServiceBusyError_Read(field.Value) + if err != nil { + return err + } + } } } @@ -428,6 +456,9 @@ func (v *MatchingService_PollForActivityTask_Result) FromWire(w wire.Value) erro if v.LimitExceededError != nil { count++ } + if v.ServiceBusyError != nil { + count++ + } if count != 1 { return fmt.Errorf("MatchingService_PollForActivityTask_Result should have exactly one field: got %v fields", count) } @@ -442,7 +473,7 @@ func (v *MatchingService_PollForActivityTask_Result) String() string { return "" } - var fields [4]string + var fields [5]string i := 0 if v.Success != nil { fields[i] = fmt.Sprintf("Success: %v", v.Success) @@ -460,6 +491,10 @@ func (v *MatchingService_PollForActivityTask_Result) String() string { fields[i] = fmt.Sprintf("LimitExceededError: %v", v.LimitExceededError) i++ } + if v.ServiceBusyError != nil { + fields[i] = fmt.Sprintf("ServiceBusyError: %v", v.ServiceBusyError) + i++ + } return fmt.Sprintf("MatchingService_PollForActivityTask_Result{%v}", strings.Join(fields[:i], ", ")) } @@ -481,6 +516,9 @@ func (v *MatchingService_PollForActivityTask_Result) Equals(rhs *MatchingService if !((v.LimitExceededError == nil && rhs.LimitExceededError == nil) || (v.LimitExceededError != nil && rhs.LimitExceededError != nil && v.LimitExceededError.Equals(rhs.LimitExceededError))) { return false } + if !((v.ServiceBusyError == nil && rhs.ServiceBusyError == nil) || (v.ServiceBusyError != nil && rhs.ServiceBusyError != nil && v.ServiceBusyError.Equals(rhs.ServiceBusyError))) { + return false + } return true } diff --git a/.gen/go/matching/matchingservice_pollfordecisiontask.go b/.gen/go/matching/matchingservice_pollfordecisiontask.go index fd973948d14..88ce66e4d8e 100644 --- a/.gen/go/matching/matchingservice_pollfordecisiontask.go +++ b/.gen/go/matching/matchingservice_pollfordecisiontask.go @@ -221,6 +221,8 @@ func init() { return true case *shared.LimitExceededError: return true + case *shared.ServiceBusyError: + return true default: return false } @@ -247,6 +249,11 @@ func init() { return nil, errors.New("WrapResponse received non-nil error type with nil value for MatchingService_PollForDecisionTask_Result.LimitExceededError") } return &MatchingService_PollForDecisionTask_Result{LimitExceededError: e}, nil + case *shared.ServiceBusyError: + if e == nil { + return nil, errors.New("WrapResponse received non-nil error type with nil value for MatchingService_PollForDecisionTask_Result.ServiceBusyError") + } + return &MatchingService_PollForDecisionTask_Result{ServiceBusyError: e}, nil } return nil, err @@ -264,6 +271,10 @@ func init() { err = result.LimitExceededError return } + if result.ServiceBusyError != nil { + err = result.ServiceBusyError + return + } if result.Success != nil { success = result.Success @@ -287,6 +298,7 @@ type MatchingService_PollForDecisionTask_Result struct { BadRequestError *shared.BadRequestError `json:"badRequestError,omitempty"` InternalServiceError *shared.InternalServiceError `json:"internalServiceError,omitempty"` LimitExceededError *shared.LimitExceededError `json:"limitExceededError,omitempty"` + ServiceBusyError *shared.ServiceBusyError `json:"serviceBusyError,omitempty"` } // ToWire translates a MatchingService_PollForDecisionTask_Result struct into a Thrift-level intermediate @@ -306,7 +318,7 @@ type MatchingService_PollForDecisionTask_Result struct { // } func (v *MatchingService_PollForDecisionTask_Result) ToWire() (wire.Value, error) { var ( - fields [4]wire.Field + fields [5]wire.Field i int = 0 w wire.Value err error @@ -344,6 +356,14 @@ func (v *MatchingService_PollForDecisionTask_Result) ToWire() (wire.Value, error fields[i] = wire.Field{ID: 3, Value: w} i++ } + if v.ServiceBusyError != nil { + w, err = v.ServiceBusyError.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 4, Value: w} + i++ + } if i != 1 { return wire.Value{}, fmt.Errorf("MatchingService_PollForDecisionTask_Result should have exactly one field: got %v fields", i) @@ -411,6 +431,14 @@ func (v *MatchingService_PollForDecisionTask_Result) FromWire(w wire.Value) erro return err } + } + case 4: + if field.Value.Type() == wire.TStruct { + v.ServiceBusyError, err = _ServiceBusyError_Read(field.Value) + if err != nil { + return err + } + } } } @@ -428,6 +456,9 @@ func (v *MatchingService_PollForDecisionTask_Result) FromWire(w wire.Value) erro if v.LimitExceededError != nil { count++ } + if v.ServiceBusyError != nil { + count++ + } if count != 1 { return fmt.Errorf("MatchingService_PollForDecisionTask_Result should have exactly one field: got %v fields", count) } @@ -442,7 +473,7 @@ func (v *MatchingService_PollForDecisionTask_Result) String() string { return "" } - var fields [4]string + var fields [5]string i := 0 if v.Success != nil { fields[i] = fmt.Sprintf("Success: %v", v.Success) @@ -460,6 +491,10 @@ func (v *MatchingService_PollForDecisionTask_Result) String() string { fields[i] = fmt.Sprintf("LimitExceededError: %v", v.LimitExceededError) i++ } + if v.ServiceBusyError != nil { + fields[i] = fmt.Sprintf("ServiceBusyError: %v", v.ServiceBusyError) + i++ + } return fmt.Sprintf("MatchingService_PollForDecisionTask_Result{%v}", strings.Join(fields[:i], ", ")) } @@ -481,6 +516,9 @@ func (v *MatchingService_PollForDecisionTask_Result) Equals(rhs *MatchingService if !((v.LimitExceededError == nil && rhs.LimitExceededError == nil) || (v.LimitExceededError != nil && rhs.LimitExceededError != nil && v.LimitExceededError.Equals(rhs.LimitExceededError))) { return false } + if !((v.ServiceBusyError == nil && rhs.ServiceBusyError == nil) || (v.ServiceBusyError != nil && rhs.ServiceBusyError != nil && v.ServiceBusyError.Equals(rhs.ServiceBusyError))) { + return false + } return true } diff --git a/.gen/go/matching/matchingservice_queryworkflow.go b/.gen/go/matching/matchingservice_queryworkflow.go index a82accc99ee..484c02153d5 100644 --- a/.gen/go/matching/matchingservice_queryworkflow.go +++ b/.gen/go/matching/matchingservice_queryworkflow.go @@ -225,6 +225,8 @@ func init() { return true case *shared.LimitExceededError: return true + case *shared.ServiceBusyError: + return true default: return false } @@ -261,6 +263,11 @@ func init() { return nil, errors.New("WrapResponse received non-nil error type with nil value for MatchingService_QueryWorkflow_Result.LimitExceededError") } return &MatchingService_QueryWorkflow_Result{LimitExceededError: e}, nil + case *shared.ServiceBusyError: + if e == nil { + return nil, errors.New("WrapResponse received non-nil error type with nil value for MatchingService_QueryWorkflow_Result.ServiceBusyError") + } + return &MatchingService_QueryWorkflow_Result{ServiceBusyError: e}, nil } return nil, err @@ -286,6 +293,10 @@ func init() { err = result.LimitExceededError return } + if result.ServiceBusyError != nil { + err = result.ServiceBusyError + return + } if result.Success != nil { success = result.Success @@ -311,6 +322,7 @@ type MatchingService_QueryWorkflow_Result struct { EntityNotExistError *shared.EntityNotExistsError `json:"entityNotExistError,omitempty"` QueryFailedError *shared.QueryFailedError `json:"queryFailedError,omitempty"` LimitExceededError *shared.LimitExceededError `json:"limitExceededError,omitempty"` + ServiceBusyError *shared.ServiceBusyError `json:"serviceBusyError,omitempty"` } // ToWire translates a MatchingService_QueryWorkflow_Result struct into a Thrift-level intermediate @@ -330,7 +342,7 @@ type MatchingService_QueryWorkflow_Result struct { // } func (v *MatchingService_QueryWorkflow_Result) ToWire() (wire.Value, error) { var ( - fields [6]wire.Field + fields [7]wire.Field i int = 0 w wire.Value err error @@ -384,6 +396,14 @@ func (v *MatchingService_QueryWorkflow_Result) ToWire() (wire.Value, error) { fields[i] = wire.Field{ID: 5, Value: w} i++ } + if v.ServiceBusyError != nil { + w, err = v.ServiceBusyError.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 6, Value: w} + i++ + } if i != 1 { return wire.Value{}, fmt.Errorf("MatchingService_QueryWorkflow_Result should have exactly one field: got %v fields", i) @@ -473,6 +493,14 @@ func (v *MatchingService_QueryWorkflow_Result) FromWire(w wire.Value) error { return err } + } + case 6: + if field.Value.Type() == wire.TStruct { + v.ServiceBusyError, err = _ServiceBusyError_Read(field.Value) + if err != nil { + return err + } + } } } @@ -496,6 +524,9 @@ func (v *MatchingService_QueryWorkflow_Result) FromWire(w wire.Value) error { if v.LimitExceededError != nil { count++ } + if v.ServiceBusyError != nil { + count++ + } if count != 1 { return fmt.Errorf("MatchingService_QueryWorkflow_Result should have exactly one field: got %v fields", count) } @@ -510,7 +541,7 @@ func (v *MatchingService_QueryWorkflow_Result) String() string { return "" } - var fields [6]string + var fields [7]string i := 0 if v.Success != nil { fields[i] = fmt.Sprintf("Success: %v", v.Success) @@ -536,6 +567,10 @@ func (v *MatchingService_QueryWorkflow_Result) String() string { fields[i] = fmt.Sprintf("LimitExceededError: %v", v.LimitExceededError) i++ } + if v.ServiceBusyError != nil { + fields[i] = fmt.Sprintf("ServiceBusyError: %v", v.ServiceBusyError) + i++ + } return fmt.Sprintf("MatchingService_QueryWorkflow_Result{%v}", strings.Join(fields[:i], ", ")) } @@ -563,6 +598,9 @@ func (v *MatchingService_QueryWorkflow_Result) Equals(rhs *MatchingService_Query if !((v.LimitExceededError == nil && rhs.LimitExceededError == nil) || (v.LimitExceededError != nil && rhs.LimitExceededError != nil && v.LimitExceededError.Equals(rhs.LimitExceededError))) { return false } + if !((v.ServiceBusyError == nil && rhs.ServiceBusyError == nil) || (v.ServiceBusyError != nil && rhs.ServiceBusyError != nil && v.ServiceBusyError.Equals(rhs.ServiceBusyError))) { + return false + } return true } diff --git a/.gen/go/matching/matchingservice_respondquerytaskcompleted.go b/.gen/go/matching/matchingservice_respondquerytaskcompleted.go index ab4ea6c9fb0..37d4b88e545 100644 --- a/.gen/go/matching/matchingservice_respondquerytaskcompleted.go +++ b/.gen/go/matching/matchingservice_respondquerytaskcompleted.go @@ -224,6 +224,8 @@ func init() { return true case *shared.LimitExceededError: return true + case *shared.ServiceBusyError: + return true default: return false } @@ -255,6 +257,11 @@ func init() { return nil, errors.New("WrapResponse received non-nil error type with nil value for MatchingService_RespondQueryTaskCompleted_Result.LimitExceededError") } return &MatchingService_RespondQueryTaskCompleted_Result{LimitExceededError: e}, nil + case *shared.ServiceBusyError: + if e == nil { + return nil, errors.New("WrapResponse received non-nil error type with nil value for MatchingService_RespondQueryTaskCompleted_Result.ServiceBusyError") + } + return &MatchingService_RespondQueryTaskCompleted_Result{ServiceBusyError: e}, nil } return nil, err @@ -276,6 +283,10 @@ func init() { err = result.LimitExceededError return } + if result.ServiceBusyError != nil { + err = result.ServiceBusyError + return + } return } @@ -289,6 +300,7 @@ type MatchingService_RespondQueryTaskCompleted_Result struct { InternalServiceError *shared.InternalServiceError `json:"internalServiceError,omitempty"` EntityNotExistError *shared.EntityNotExistsError `json:"entityNotExistError,omitempty"` LimitExceededError *shared.LimitExceededError `json:"limitExceededError,omitempty"` + ServiceBusyError *shared.ServiceBusyError `json:"serviceBusyError,omitempty"` } // ToWire translates a MatchingService_RespondQueryTaskCompleted_Result struct into a Thrift-level intermediate @@ -308,7 +320,7 @@ type MatchingService_RespondQueryTaskCompleted_Result struct { // } func (v *MatchingService_RespondQueryTaskCompleted_Result) ToWire() (wire.Value, error) { var ( - fields [4]wire.Field + fields [5]wire.Field i int = 0 w wire.Value err error @@ -346,6 +358,14 @@ func (v *MatchingService_RespondQueryTaskCompleted_Result) ToWire() (wire.Value, fields[i] = wire.Field{ID: 4, Value: w} i++ } + if v.ServiceBusyError != nil { + w, err = v.ServiceBusyError.ToWire() + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 5, Value: w} + i++ + } if i > 1 { return wire.Value{}, fmt.Errorf("MatchingService_RespondQueryTaskCompleted_Result should have at most one field: got %v fields", i) @@ -407,6 +427,14 @@ func (v *MatchingService_RespondQueryTaskCompleted_Result) FromWire(w wire.Value return err } + } + case 5: + if field.Value.Type() == wire.TStruct { + v.ServiceBusyError, err = _ServiceBusyError_Read(field.Value) + if err != nil { + return err + } + } } } @@ -424,6 +452,9 @@ func (v *MatchingService_RespondQueryTaskCompleted_Result) FromWire(w wire.Value if v.LimitExceededError != nil { count++ } + if v.ServiceBusyError != nil { + count++ + } if count > 1 { return fmt.Errorf("MatchingService_RespondQueryTaskCompleted_Result should have at most one field: got %v fields", count) } @@ -438,7 +469,7 @@ func (v *MatchingService_RespondQueryTaskCompleted_Result) String() string { return "" } - var fields [4]string + var fields [5]string i := 0 if v.BadRequestError != nil { fields[i] = fmt.Sprintf("BadRequestError: %v", v.BadRequestError) @@ -456,6 +487,10 @@ func (v *MatchingService_RespondQueryTaskCompleted_Result) String() string { fields[i] = fmt.Sprintf("LimitExceededError: %v", v.LimitExceededError) i++ } + if v.ServiceBusyError != nil { + fields[i] = fmt.Sprintf("ServiceBusyError: %v", v.ServiceBusyError) + i++ + } return fmt.Sprintf("MatchingService_RespondQueryTaskCompleted_Result{%v}", strings.Join(fields[:i], ", ")) } @@ -477,6 +512,9 @@ func (v *MatchingService_RespondQueryTaskCompleted_Result) Equals(rhs *MatchingS if !((v.LimitExceededError == nil && rhs.LimitExceededError == nil) || (v.LimitExceededError != nil && rhs.LimitExceededError != nil && v.LimitExceededError.Equals(rhs.LimitExceededError))) { return false } + if !((v.ServiceBusyError == nil && rhs.ServiceBusyError == nil) || (v.ServiceBusyError != nil && rhs.ServiceBusyError != nil && v.ServiceBusyError.Equals(rhs.ServiceBusyError))) { + return false + } return true } diff --git a/client/matching/retryableClient.go b/client/matching/retryableClient.go new file mode 100644 index 00000000000..cee9c174ed0 --- /dev/null +++ b/client/matching/retryableClient.go @@ -0,0 +1,158 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package matching + +import ( + "context" + + m "github.com/uber/cadence/.gen/go/matching" + workflow "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common/backoff" + "go.uber.org/yarpc" +) + +var _ Client = (*retryableClient)(nil) + +type retryableClient struct { + client Client + policy backoff.RetryPolicy + isRetryable backoff.IsRetryable +} + +// NewRetryableClient creates a new instance of Client with retry policy +func NewRetryableClient(client Client, policy backoff.RetryPolicy, isRetryable backoff.IsRetryable) Client { + return &retryableClient{ + client: client, + policy: policy, + isRetryable: isRetryable, + } +} + +func (c *retryableClient) AddActivityTask( + ctx context.Context, + addRequest *m.AddActivityTaskRequest, + opts ...yarpc.CallOption) error { + op := func() error { + return c.client.AddActivityTask(ctx, addRequest, opts...) + } + + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) AddDecisionTask( + ctx context.Context, + addRequest *m.AddDecisionTaskRequest, + opts ...yarpc.CallOption) error { + + op := func() error { + return c.client.AddDecisionTask(ctx, addRequest, opts...) + } + + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) PollForActivityTask( + ctx context.Context, + pollRequest *m.PollForActivityTaskRequest, + opts ...yarpc.CallOption) (*workflow.PollForActivityTaskResponse, error) { + + var resp *workflow.PollForActivityTaskResponse + op := func() error { + var err error + resp, err = c.client.PollForActivityTask(ctx, pollRequest, opts...) + return err + } + + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) PollForDecisionTask( + ctx context.Context, + pollRequest *m.PollForDecisionTaskRequest, + opts ...yarpc.CallOption) (*m.PollForDecisionTaskResponse, error) { + + var resp *m.PollForDecisionTaskResponse + op := func() error { + var err error + resp, err = c.client.PollForDecisionTask(ctx, pollRequest, opts...) + return err + } + + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) QueryWorkflow( + ctx context.Context, + queryRequest *m.QueryWorkflowRequest, + opts ...yarpc.CallOption) (*workflow.QueryWorkflowResponse, error) { + + var resp *workflow.QueryWorkflowResponse + op := func() error { + var err error + resp, err = c.client.QueryWorkflow(ctx, queryRequest, opts...) + return err + } + + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} + +func (c *retryableClient) RespondQueryTaskCompleted( + ctx context.Context, + request *m.RespondQueryTaskCompletedRequest, + opts ...yarpc.CallOption) error { + + op := func() error { + return c.client.RespondQueryTaskCompleted(ctx, request, opts...) + } + + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) CancelOutstandingPoll( + ctx context.Context, + request *m.CancelOutstandingPollRequest, + opts ...yarpc.CallOption) error { + + op := func() error { + return c.client.CancelOutstandingPoll(ctx, request, opts...) + } + + return backoff.Retry(op, c.policy, c.isRetryable) +} + +func (c *retryableClient) DescribeTaskList( + ctx context.Context, + request *m.DescribeTaskListRequest, + opts ...yarpc.CallOption) (*workflow.DescribeTaskListResponse, error) { + + var resp *workflow.DescribeTaskListResponse + op := func() error { + var err error + resp, err = c.client.DescribeTaskList(ctx, request, opts...) + return err + } + + err := backoff.Retry(op, c.policy, c.isRetryable) + return resp, err +} diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index 4d76da382d8..4dfe26d76e1 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -62,6 +62,7 @@ var keys = map[Key]string{ MatchingIdleTasklistCheckInterval: "matching.idleTasklistCheckInterval", MatchingOutstandingTaskAppendsThreshold: "matching.outstandingTaskAppendsThreshold", MatchingMaxTaskBatchSize: "matching.maxTaskBatchSize", + MatchingRPS: "matching.rps", // history settings HistoryLongPollExpirationInterval: "history.longPollExpirationInterval", @@ -152,6 +153,8 @@ const ( MatchingOutstandingTaskAppendsThreshold // MatchingMaxTaskBatchSize is max batch size for task writer MatchingMaxTaskBatchSize + // MatchingRPS is request rate per second for each matching host + MatchingRPS // key for history diff --git a/common/util.go b/common/util.go index 9e8cce9de6f..7c2203e7fee 100644 --- a/common/util.go +++ b/common/util.go @@ -49,6 +49,10 @@ const ( frontendServiceOperationInitialInterval = 200 * time.Millisecond frontendServiceOperationMaxInterval = 5 * time.Second frontendServiceOperationExpirationInterval = 15 * time.Second + + matchingServiceOperationInitialInterval = 50 * time.Millisecond + matchingServiceOperationMaxInterval = 10 * time.Second + matchingServiceOperationExpirationInterval = 30 * time.Second ) // MergeDictoRight copies the contents of src to dest @@ -119,6 +123,15 @@ func CreateFrontendServiceRetryPolicy() backoff.RetryPolicy { return policy } +// CreatePersistanceRetryPolicy creates a retry policy for persistence layer operations +func CreateMatchingRetryPolicy() backoff.RetryPolicy { + policy := backoff.NewExponentialRetryPolicy(matchingServiceOperationInitialInterval) + policy.SetMaximumInterval(matchingServiceOperationMaxInterval) + policy.SetExpirationInterval(matchingServiceOperationExpirationInterval) + + return policy +} + // IsPersistenceTransientError checks if the error is a transient persistence error func IsPersistenceTransientError(err error) bool { switch err.(type) { @@ -148,6 +161,20 @@ func IsServiceNonRetryableError(err error) bool { return false } +// IsMatchingServiceTransientError checks if the error is a transient error. +func IsMatchingServiceTransientError(err error) bool { + switch err.(type) { + case *workflow.InternalServiceError: + return true + case *workflow.ServiceBusyError: + return true + case *workflow.LimitExceededError: + return true + } + + return false +} + // WorkflowIDToHistoryShard is used to map workflowID to a shardID func WorkflowIDToHistoryShard(workflowID string, numberOfShards int) int { hash := farm.Fingerprint32([]byte(workflowID)) diff --git a/idl/github.com/uber/cadence/matching.thrift b/idl/github.com/uber/cadence/matching.thrift index 45c4fc54f40..fb8a4c78f30 100644 --- a/idl/github.com/uber/cadence/matching.thrift +++ b/idl/github.com/uber/cadence/matching.thrift @@ -107,6 +107,7 @@ service MatchingService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.LimitExceededError limitExceededError, + 4: shared.ServiceBusyError serviceBusyError, ) /** @@ -118,6 +119,7 @@ service MatchingService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.LimitExceededError limitExceededError, + 4: shared.ServiceBusyError serviceBusyError, ) /** @@ -154,6 +156,7 @@ service MatchingService { 3: shared.EntityNotExistsError entityNotExistError, 4: shared.QueryFailedError queryFailedError, 5: shared.LimitExceededError limitExceededError, + 6: shared.ServiceBusyError serviceBusyError, ) /** @@ -165,6 +168,7 @@ service MatchingService { 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, 4: shared.LimitExceededError limitExceededError, + 5: shared.ServiceBusyError serviceBusyError, ) /** @@ -180,6 +184,7 @@ service MatchingService { throws ( 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, + 3: shared.ServiceBusyError serviceBusyError, ) /** @@ -191,5 +196,6 @@ service MatchingService { 1: shared.BadRequestError badRequestError, 2: shared.InternalServiceError internalServiceError, 3: shared.EntityNotExistsError entityNotExistError, + 4: shared.ServiceBusyError serviceBusyError, ) } diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index bd1a818d6c2..44e3cc29885 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -148,6 +148,8 @@ func (wh *WorkflowHandler) Start() error { if err != nil { return err } + wh.matching = matching.NewRetryableClient(wh.matching, common.CreateMatchingRetryPolicy(), + common.IsMatchingServiceTransientError) wh.metricsClient = wh.Service.GetMetricsClient() wh.startWG.Done() return nil diff --git a/service/history/timerQueueActiveProcessor.go b/service/history/timerQueueActiveProcessor.go index 9fb5ef3931c..ae6e721b0f0 100644 --- a/service/history/timerQueueActiveProcessor.go +++ b/service/history/timerQueueActiveProcessor.go @@ -66,13 +66,15 @@ func newTimerQueueActiveProcessor(shard ShardContext, historyService *historyEng // this will trigger a timer gate fire event immediately timerGate.Update(time.Time{}) timerQueueAckMgr := newTimerQueueAckMgr(metrics.TimerActiveQueueProcessorScope, shard, historyService.metricsClient, currentClusterName, logger) + retryableMatchingClient := matching.NewRetryableClient(matchingClient, common.CreateMatchingRetryPolicy(), + common.IsMatchingServiceTransientError) processor := &timerQueueActiveProcessorImpl{ shard: shard, historyService: historyService, cache: historyService.historyCache, timerTaskFilter: timerTaskFilter, logger: logger, - matchingClient: matchingClient, + matchingClient: retryableMatchingClient, metricsClient: historyService.metricsClient, currentClusterName: currentClusterName, timerGate: timerGate, @@ -99,7 +101,10 @@ func newTimerQueueFailoverProcessor(shard ShardContext, historyService *historyE return verifyFailoverActiveTask(logger, domainID, timer.DomainID, timer) } - timerQueueAckMgr := newTimerQueueFailoverAckMgr(shard, historyService.metricsClient, standbyClusterName, minLevel, maxLevel, logger) + timerQueueAckMgr := newTimerQueueFailoverAckMgr(shard, historyService.metricsClient, standbyClusterName, minLevel, + maxLevel, logger) + retryableMatchingClient := matching.NewRetryableClient(matchingClient, common.CreateMatchingRetryPolicy(), + common.IsMatchingServiceTransientError) processor := &timerQueueActiveProcessorImpl{ shard: shard, historyService: historyService, @@ -107,7 +112,7 @@ func newTimerQueueFailoverProcessor(shard ShardContext, historyService *historyE timerTaskFilter: timerTaskFilter, logger: logger, metricsClient: historyService.metricsClient, - matchingClient: matchingClient, + matchingClient: retryableMatchingClient, timerGate: NewLocalTimerGate(), timerQueueProcessorBase: newTimerQueueProcessorBase(metrics.TimerActiveQueueProcessorScope, shard, historyService, timerQueueAckMgr, timeNow, logger), timerQueueAckMgr: timerQueueAckMgr, diff --git a/service/history/transferQueueActiveProcessor.go b/service/history/transferQueueActiveProcessor.go index 2975265009f..96bd09f92b1 100644 --- a/service/history/transferQueueActiveProcessor.go +++ b/service/history/transferQueueActiveProcessor.go @@ -93,13 +93,16 @@ func newTransferQueueActiveProcessor(shard ShardContext, historyService *history return shard.UpdateTransferClusterAckLevel(currentClusterName, ackLevel) } + retryableMatchingClient := matching.NewRetryableClient(matchingClient, common.CreateMatchingRetryPolicy(), + common.IsMatchingServiceTransientError) + processor := &transferQueueActiveProcessorImpl{ currentClusterName: currentClusterName, shard: shard, historyService: historyService, options: options, visibilityManager: visibilityMgr, - matchingClient: matchingClient, + matchingClient: retryableMatchingClient, historyClient: historyClient, logger: logger, metricsClient: historyService.metricsClient, @@ -147,13 +150,16 @@ func newTransferQueueFailoverProcessor(shard ShardContext, historyService *histo return nil } + retryableMatchingClient := matching.NewRetryableClient(matchingClient, common.CreateMatchingRetryPolicy(), + common.IsMatchingServiceTransientError) + processor := &transferQueueActiveProcessorImpl{ currentClusterName: currentClusterName, shard: shard, historyService: historyService, options: options, visibilityManager: visibilityMgr, - matchingClient: matchingClient, + matchingClient: retryableMatchingClient, historyClient: historyClient, logger: logger, metricsClient: historyService.metricsClient, diff --git a/service/matching/handler.go b/service/matching/handler.go index a9fc5ddf95c..c27dcd249c8 100644 --- a/service/matching/handler.go +++ b/service/matching/handler.go @@ -47,9 +47,14 @@ type Handler struct { metricsClient metrics.Client startWG sync.WaitGroup domainCache cache.DomainCache + rateLimiter common.TokenBucket service.Service } +var ( + errMatchingHostThrottle = &gen.ServiceBusyError{Message: "Matching host rps exceeded"} +) + // NewHandler creates a thrift handler for the history service func NewHandler(sVice service.Service, config *Config, taskPersistence persistence.TaskManager, metadataMgr persistence.MetadataManager) *Handler { handler := &Handler{ @@ -57,6 +62,7 @@ func NewHandler(sVice service.Service, config *Config, taskPersistence persisten taskPersistence: taskPersistence, metadataMgr: metadataMgr, config: config, + rateLimiter: common.NewTokenBucket(config.RPS(), common.NewRealTimeSource()), } // prevent us from trying to serve requests before matching engine is started and ready handler.startWG.Add(1) @@ -112,6 +118,11 @@ func (h *Handler) AddActivityTask(ctx context.Context, addRequest *m.AddActivity scope := metrics.MatchingAddActivityTaskScope sw := h.startRequestProfile("AddActivityTask", scope) defer sw.Stop() + + if ok, _ := h.rateLimiter.TryConsume(1); !ok { + return h.handleErr(errMatchingHostThrottle, scope) + } + return h.handleErr(h.engine.AddActivityTask(addRequest), scope) } @@ -120,6 +131,11 @@ func (h *Handler) AddDecisionTask(ctx context.Context, addRequest *m.AddDecision scope := metrics.MatchingAddDecisionTaskScope sw := h.startRequestProfile("AddDecisionTask", scope) defer sw.Stop() + + if ok, _ := h.rateLimiter.TryConsume(1); !ok { + return h.handleErr(errMatchingHostThrottle, scope) + } + return h.handleErr(h.engine.AddDecisionTask(addRequest), scope) } @@ -131,6 +147,10 @@ func (h *Handler) PollForActivityTask(ctx context.Context, sw := h.startRequestProfile("PollForActivityTask", scope) defer sw.Stop() + if ok, _ := h.rateLimiter.TryConsume(1); !ok { + return nil, h.handleErr(errMatchingHostThrottle, scope) + } + response, err := h.engine.PollForActivityTask(ctx, pollRequest) return response, h.handleErr(err, scope) } @@ -143,6 +163,10 @@ func (h *Handler) PollForDecisionTask(ctx context.Context, sw := h.startRequestProfile("PollForDecisionTask", scope) defer sw.Stop() + if ok, _ := h.rateLimiter.TryConsume(1); !ok { + return nil, h.handleErr(errMatchingHostThrottle, scope) + } + response, err := h.engine.PollForDecisionTask(ctx, pollRequest) return response, h.handleErr(err, scope) } @@ -154,6 +178,10 @@ func (h *Handler) QueryWorkflow(ctx context.Context, sw := h.startRequestProfile("QueryWorkflow", scope) defer sw.Stop() + if ok, _ := h.rateLimiter.TryConsume(1); !ok { + return nil, h.handleErr(errMatchingHostThrottle, scope) + } + response, err := h.engine.QueryWorkflow(ctx, queryRequest) return response, h.handleErr(err, scope) } @@ -164,6 +192,9 @@ func (h *Handler) RespondQueryTaskCompleted(ctx context.Context, request *m.Resp sw := h.startRequestProfile("RespondQueryTaskCompleted", scope) defer sw.Stop() + // Count the request in the RPS, but we still accept it even if RPS is exceeded + h.rateLimiter.TryConsume(1) + err := h.engine.RespondQueryTaskCompleted(ctx, request) return h.handleErr(err, scope) } @@ -175,6 +206,9 @@ func (h *Handler) CancelOutstandingPoll(ctx context.Context, sw := h.startRequestProfile("CancelOutstandingPoll", scope) defer sw.Stop() + // Count the request in the RPS, but we still accept it even if RPS is exceeded + h.rateLimiter.TryConsume(1) + err := h.engine.CancelOutstandingPoll(ctx, request) return h.handleErr(err, scope) } @@ -186,6 +220,10 @@ func (h *Handler) DescribeTaskList(ctx context.Context, request *m.DescribeTaskL sw := h.startRequestProfile("DescribeTaskList", scope) defer sw.Stop() + if ok, _ := h.rateLimiter.TryConsume(1); !ok { + return nil, h.handleErr(errMatchingHostThrottle, scope) + } + response, err := h.engine.DescribeTaskList(ctx, request) return response, h.handleErr(err, scope) } @@ -218,6 +256,9 @@ func (h *Handler) handleErr(err error, scope int) error { case *gen.LimitExceededError: h.metricsClient.IncCounter(scope, metrics.CadenceErrLimitExceededCounter) return err + case *gen.ServiceBusyError: + h.metricsClient.IncCounter(scope, metrics.CadenceErrServiceBusyCounter) + return err default: h.metricsClient.IncCounter(scope, metrics.CadenceFailures) return &gen.InternalServiceError{Message: err.Error()} diff --git a/service/matching/service.go b/service/matching/service.go index ef0a959eaca..33fefdc23a2 100644 --- a/service/matching/service.go +++ b/service/matching/service.go @@ -32,6 +32,7 @@ import ( // Config represents configuration for cadence-matching service type Config struct { EnableSyncMatch dynamicconfig.BoolPropertyFnWithTaskListInfoFilters + RPS dynamicconfig.IntPropertyFn // taskListManager configuration RangeSize int64 @@ -51,6 +52,7 @@ type Config struct { func NewConfig(dc *dynamicconfig.Collection) *Config { return &Config{ EnableSyncMatch: dc.GetBoolPropertyFilteredByTaskListInfo(dynamicconfig.MatchingEnableSyncMatch, true), + RPS: dc.GetIntProperty(dynamicconfig.MatchingRPS, 1200), RangeSize: 100000, GetTasksBatchSize: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingGetTasksBatchSize, 1000), UpdateAckInterval: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingUpdateAckInterval, 1*time.Minute),