Skip to content

Commit

Permalink
Add retry in some frontend API
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu committed Mar 24, 2018
1 parent 115a38a commit 5964776
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 16 deletions.
18 changes: 18 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ const (
historyServiceOperationInitialInterval = 50 * time.Millisecond
historyServiceOperationMaxInterval = 10 * time.Second
historyServiceOperationExpirationInterval = 30 * time.Second

frontendServiceOperationInitialInterval = 50 * time.Millisecond
frontendServiceOperationMaxInterval = 5 * time.Second
frontendServiceOperationExpirationInterval = 15 * time.Second
)

// MergeDictoRight copies the contents of src to dest
Expand Down Expand Up @@ -103,6 +107,15 @@ func CreateHistoryServiceRetryPolicy() backoff.RetryPolicy {
return policy
}

// CreateFrontendServiceRetryPolicy creates a retry policy for calls to frontend service
func CreateFrontendServiceRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(frontendServiceOperationInitialInterval)
policy.SetMaximumInterval(frontendServiceOperationMaxInterval)
policy.SetExpirationInterval(frontendServiceOperationExpirationInterval)

return policy
}

// IsPersistenceTransientError checks if the error is a transient persistence error
func IsPersistenceTransientError(err error) bool {
switch err.(type) {
Expand All @@ -113,6 +126,11 @@ func IsPersistenceTransientError(err error) bool {
return false
}

// IsServiceTransientError checks if the error is a retryable error.
func IsServiceTransientError(err error) bool {
return !IsServiceNonRetryableError(err)
}

// IsServiceNonRetryableError checks if the error is a non retryable error.
func IsServiceNonRetryableError(err error) bool {
switch err.(type) {
Expand Down
53 changes: 38 additions & 15 deletions service/frontend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/client"
"github.com/uber/cadence/common/logging"
Expand Down Expand Up @@ -105,6 +106,8 @@ var (
errCannotRemoveClustersFromDomain = &gen.BadRequestError{Message: "Cannot remove existing replicated clusters from a domain."}
errActiveClusterNotInClusters = &gen.BadRequestError{Message: "Active cluster is not contained in all clusters."}
errCannotDoDomainFailoverAndUpdate = &gen.BadRequestError{Message: "Cannot set active cluster to current cluster when other paramaters are set."}

frontendServiceRetryPolicy = common.CreateFrontendServiceRetryPolicy()
)

// NewWorkflowHandler creates a thrift handler for the cadence service
Expand Down Expand Up @@ -534,11 +537,18 @@ func (wh *WorkflowHandler) PollForActivityTask(
}

pollerID := uuid.New()
resp, err := wh.matching.PollForActivityTask(ctx, &m.PollForActivityTaskRequest{
DomainUUID: common.StringPtr(domainID),
PollerID: common.StringPtr(pollerID),
PollRequest: pollRequest,
})
var resp *gen.PollForActivityTaskResponse
op := func() error {
var err error
resp, err = wh.matching.PollForActivityTask(ctx, &m.PollForActivityTaskRequest{
DomainUUID: common.StringPtr(domainID),
PollerID: common.StringPtr(pollerID),
PollRequest: pollRequest,
})
return err
}

err = backoff.Retry(op, frontendServiceRetryPolicy, common.IsServiceTransientError)
if err != nil {
err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeActivity, pollRequest.TaskList, pollerID)
if err != nil {
Expand Down Expand Up @@ -582,11 +592,18 @@ func (wh *WorkflowHandler) PollForDecisionTask(
wh.Service.GetLogger().Debugf("Poll for decision. DomainName: %v, DomainID: %v", domainName, domainID)

pollerID := uuid.New()
matchingResp, err := wh.matching.PollForDecisionTask(ctx, &m.PollForDecisionTaskRequest{
DomainUUID: common.StringPtr(domainID),
PollerID: common.StringPtr(pollerID),
PollRequest: pollRequest,
})
var matchingResp *m.PollForDecisionTaskResponse
op := func() error {
var err error
matchingResp, err = wh.matching.PollForDecisionTask(ctx, &m.PollForDecisionTaskRequest{
DomainUUID: common.StringPtr(domainID),
PollerID: common.StringPtr(pollerID),
PollRequest: pollRequest,
})
return err
}

err = backoff.Retry(op, frontendServiceRetryPolicy, common.IsServiceTransientError)
if err != nil {
err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeDecision, pollRequest.TaskList, pollerID)
if err != nil {
Expand Down Expand Up @@ -619,7 +636,7 @@ func (wh *WorkflowHandler) cancelOutstandingPoll(ctx context.Context, err error,
TaskList: taskList,
PollerID: common.StringPtr(pollerID),
})
// We can do much if this call fails. Just log the error and move on
// We can not do much if this call fails. Just log the error and move on
if err != nil {
wh.Service.GetLogger().Warnf("Failed to cancel outstanding poller. Tasklist: %v, Error: %v,",
taskList.GetName(), err)
Expand Down Expand Up @@ -1769,11 +1786,17 @@ func (wh *WorkflowHandler) DescribeTaskList(ctx context.Context, request *gen.De
return nil, err
}

response, err := wh.matching.DescribeTaskList(ctx, &m.DescribeTaskListRequest{
DomainUUID: common.StringPtr(domainID),
DescRequest: request,
})
var response *gen.DescribeTaskListResponse
op := func() error {
var err error
response, err = wh.matching.DescribeTaskList(ctx, &m.DescribeTaskListRequest{
DomainUUID: common.StringPtr(domainID),
DescRequest: request,
})
return err
}

err = backoff.Retry(op, frontendServiceRetryPolicy, common.IsServiceTransientError)
if err != nil {
return nil, wh.error(err, scope)
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/timerGate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type (
}

// TimerGateImpl is an timer implementation,
// which basically is an wrrapper of golang's timer and
// which basically is an wrapper of golang's timer and
// additional feature
TimerGateImpl struct {
// the channel which will be used to proxy the fired timer
Expand Down

0 comments on commit 5964776

Please sign in to comment.