Skip to content

Commit

Permalink
Merge branch 'protoc-gen-go' of github.com:vytautas-karpavicius/caden…
Browse files Browse the repository at this point in the history
…ce into protoc-gen-go
  • Loading branch information
vytautas-karpavicius committed Jan 21, 2021
2 parents 3cd4774 + 1b4092e commit 3ae7451
Show file tree
Hide file tree
Showing 36 changed files with 261 additions and 288 deletions.
3 changes: 1 addition & 2 deletions common/ndc/history_resender.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,11 @@ func (n *HistoryResenderImpl) getHistory(

logger := n.logger.WithTags(tag.WorkflowRunID(runID))

domainEntry, err := n.domainCache.GetDomainByID(domainID)
domainName, err := n.domainCache.GetDomainName(domainID)
if err != nil {
logger.Error("error getting domain", tag.Error(err))
return nil, err
}
domainName := domainEntry.GetInfo().Name

ctx, cancel := context.WithTimeout(ctx, resendContextTimeout)
defer cancel()
Expand Down
2 changes: 1 addition & 1 deletion common/ndc/history_resender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *historyResenderSuite) SetupTest() {
1234,
nil,
)
s.mockDomainCache.EXPECT().GetDomainByID(s.domainID).Return(domainEntry, nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomainName(s.domainID).Return(s.domainName, nil).AnyTimes()
s.mockDomainCache.EXPECT().GetDomain(s.domainName).Return(domainEntry, nil).AnyTimes()
s.serializer = persistence.NewPayloadSerializer()

Expand Down
100 changes: 50 additions & 50 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,15 +688,15 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(
return nil, wh.error(errDomainNotSet, scope)
}

domainEntry, err := wh.GetDomainCache().GetDomainByID(taskToken.DomainID)
domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
if err != nil {
return nil, wh.error(err, scope)
}

scope, sw := wh.startRequestProfileWithDomain(
metrics.FrontendRecordActivityTaskHeartbeatScope,
domainWrapper{
domain: domainEntry.GetInfo().Name,
domain: domainName,
},
)
defer sw.Stop()
Expand All @@ -705,8 +705,8 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(
return nil, errShuttingDown
}

sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name)
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)

if err := common.CheckEventBlobSizeLimit(
len(heartbeatRequest.Details),
Expand Down Expand Up @@ -803,16 +803,16 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID(
return nil, wh.error(err, scope)
}

domainEntry, err := wh.GetDomainCache().GetDomainByID(taskToken.DomainID)
domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
if err != nil {
return nil, wh.error(err, scope)
}

// add domain tag to scope, so further metrics will have the domain tag
scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name))
scope = scope.Tagged(metrics.DomainTag(domainName))

sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name)
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)

if err := common.CheckEventBlobSizeLimit(
len(heartbeatRequest.Details),
Expand Down Expand Up @@ -889,18 +889,18 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted(
return wh.error(errDomainNotSet, scope)
}

domainEntry, err := wh.GetDomainCache().GetDomainByID(taskToken.DomainID)
domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
if err != nil {
return wh.error(err, scope)
}
if !wh.validIDLength(completeRequest.GetIdentity(), scope, domainEntry.GetInfo().Name) {
if !wh.validIDLength(completeRequest.GetIdentity(), scope, domainName) {
return wh.error(errIdentityTooLong, scope)
}

scope, sw := wh.startRequestProfileWithDomain(
metrics.FrontendRespondActivityTaskCompletedScope,
domainWrapper{
domain: domainEntry.GetInfo().Name,
domain: domainName,
},
)
defer sw.Stop()
Expand All @@ -909,8 +909,8 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted(
return errShuttingDown
}

sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name)
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)

if err := common.CheckEventBlobSizeLimit(
len(completeRequest.Result),
Expand Down Expand Up @@ -1009,16 +1009,16 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedByID(
return wh.error(err, scope)
}

domainEntry, err := wh.GetDomainCache().GetDomainByID(taskToken.DomainID)
domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
if err != nil {
return wh.error(err, scope)
}

// add domain tag to scope, so further metrics will have the domain tag
scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name))
scope = scope.Tagged(metrics.DomainTag(domainName))

sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name)
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)

if err := common.CheckEventBlobSizeLimit(
len(completeRequest.Result),
Expand Down Expand Up @@ -1094,15 +1094,15 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(
return wh.error(errDomainNotSet, scope)
}

domainEntry, err := wh.GetDomainCache().GetDomainByID(taskToken.DomainID)
domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
if err != nil {
return wh.error(err, scope)
}

scope, sw := wh.startRequestProfileWithDomain(
metrics.FrontendRespondActivityTaskFailedScope,
domainWrapper{
domain: domainEntry.GetInfo().Name,
domain: domainName,
},
)
defer sw.Stop()
Expand All @@ -1111,12 +1111,12 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(
return errShuttingDown
}

if !wh.validIDLength(failedRequest.GetIdentity(), scope, domainEntry.GetInfo().Name) {
if !wh.validIDLength(failedRequest.GetIdentity(), scope, domainName) {
return wh.error(errIdentityTooLong, scope)
}

sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name)
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)

if err := common.CheckEventBlobSizeLimit(
len(failedRequest.Details),
Expand Down Expand Up @@ -1202,16 +1202,16 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedByID(
return wh.error(err, scope)
}

domainEntry, err := wh.GetDomainCache().GetDomainByID(taskToken.DomainID)
domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
if err != nil {
return wh.error(err, scope)
}

// add domain tag to scope, so further metrics will have the domain tag
scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name))
scope = scope.Tagged(metrics.DomainTag(domainName))

sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name)
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)

if err := common.CheckEventBlobSizeLimit(
len(failedRequest.Details),
Expand Down Expand Up @@ -1276,15 +1276,15 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled(
return wh.error(errDomainNotSet, scope)
}

domainEntry, err := wh.GetDomainCache().GetDomainByID(taskToken.DomainID)
domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
if err != nil {
return wh.error(err, scope)
}

scope, sw := wh.startRequestProfileWithDomain(
metrics.FrontendRespondActivityTaskCanceledScope,
domainWrapper{
domain: domainEntry.GetInfo().Name,
domain: domainName,
},
)
defer sw.Stop()
Expand All @@ -1293,12 +1293,12 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled(
return errShuttingDown
}

if !wh.validIDLength(cancelRequest.GetIdentity(), scope, domainEntry.GetInfo().Name) {
if !wh.validIDLength(cancelRequest.GetIdentity(), scope, domainName) {
return wh.error(errIdentityTooLong, scope)
}

sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name)
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)

if err := common.CheckEventBlobSizeLimit(
len(cancelRequest.Details),
Expand Down Expand Up @@ -1396,16 +1396,16 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledByID(
return wh.error(err, scope)
}

domainEntry, err := wh.GetDomainCache().GetDomainByID(taskToken.DomainID)
domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
if err != nil {
return wh.error(err, scope)
}

// add domain tag to scope, so further metrics will have the domain tag
scope = scope.Tagged(metrics.DomainTag(domainEntry.GetInfo().Name))
scope = scope.Tagged(metrics.DomainTag(domainName))

sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name)
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)

if err := common.CheckEventBlobSizeLimit(
len(cancelRequest.Details),
Expand Down Expand Up @@ -1481,15 +1481,15 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted(
return nil, wh.error(errDomainNotSet, scope)
}

domainEntry, err := wh.GetDomainCache().GetDomainByID(taskToken.DomainID)
domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
if err != nil {
return nil, wh.error(err, scope)
}

scope, sw := wh.startRequestProfileWithDomain(
metrics.FrontendRespondDecisionTaskCompletedScope,
domainWrapper{
domain: domainEntry.GetInfo().Name,
domain: domainName,
},
)
defer sw.Stop()
Expand All @@ -1506,7 +1506,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted(
return nil, wh.error(err, scope)
}

if !wh.validIDLength(completeRequest.GetIdentity(), scope, domainEntry.GetInfo().Name) {
if !wh.validIDLength(completeRequest.GetIdentity(), scope, domainName) {
return nil, wh.error(errIdentityTooLong, scope)
}

Expand Down Expand Up @@ -1567,15 +1567,15 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed(
return wh.error(errDomainNotSet, scope)
}

domainEntry, err := wh.GetDomainCache().GetDomainByID(taskToken.DomainID)
domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
if err != nil {
return wh.error(err, scope)
}

scope, sw := wh.startRequestProfileWithDomain(
metrics.FrontendRespondDecisionTaskFailedScope,
domainWrapper{
domain: domainEntry.GetInfo().Name,
domain: domainName,
},
)
defer sw.Stop()
Expand All @@ -1584,12 +1584,12 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed(
return errShuttingDown
}

if !wh.validIDLength(failedRequest.GetIdentity(), scope, domainEntry.GetInfo().Name) {
if !wh.validIDLength(failedRequest.GetIdentity(), scope, domainName) {
return wh.error(errIdentityTooLong, scope)
}

sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name)
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)

if err := common.CheckEventBlobSizeLimit(
len(failedRequest.Details),
Expand Down Expand Up @@ -1646,15 +1646,15 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted(
return wh.error(errInvalidTaskToken, scope)
}

domainEntry, err := wh.GetDomainCache().GetDomainByID(queryTaskToken.DomainID)
domainName, err := wh.GetDomainCache().GetDomainName(queryTaskToken.DomainID)
if err != nil {
return wh.error(err, scope)
}

scope, sw := wh.startRequestProfileWithDomain(
metrics.FrontendRespondQueryTaskCompletedScope,
domainWrapper{
domain: domainEntry.GetInfo().Name,
domain: domainName,
},
)
defer sw.Stop()
Expand All @@ -1663,8 +1663,8 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted(
return errShuttingDown
}

sizeLimitError := wh.config.BlobSizeLimitError(domainEntry.GetInfo().Name)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainEntry.GetInfo().Name)
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)

if err := common.CheckEventBlobSizeLimit(
len(completeRequest.GetQueryResult()),
Expand Down Expand Up @@ -3622,19 +3622,19 @@ func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
if matchingResp.GetStickyExecutionEnabled() {
firstEventID = matchingResp.GetPreviousStartedEventID() + 1
}
domain, dErr := wh.GetDomainCache().GetDomainByID(domainID)
domainName, dErr := wh.GetDomainCache().GetDomainName(domainID)
if dErr != nil {
return nil, dErr
}
scope = scope.Tagged(metrics.DomainTag(domain.GetInfo().Name))
scope = scope.Tagged(metrics.DomainTag(domainName))
history, persistenceToken, err = wh.getHistory(
ctx,
scope,
domainID,
*matchingResp.WorkflowExecution,
firstEventID,
nextEventID,
int32(wh.config.HistoryMaxPageSize(domain.GetInfo().Name)),
int32(wh.config.HistoryMaxPageSize(domainName)),
nil,
matchingResp.DecisionInfo,
branchToken,
Expand Down
4 changes: 2 additions & 2 deletions service/history/decisionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,11 +558,11 @@ func (v *decisionAttrValidator) validateContinueAsNewWorkflowExecutionAttributes
return &types.BadRequestError{Message: "BackoffStartInterval is less than 0."}
}

domainEntry, err := v.domainCache.GetDomainByID(executionInfo.DomainID)
domainName, err := v.domainCache.GetDomainName(executionInfo.DomainID)
if err != nil {
return err
}
return v.searchAttributesValidator.ValidateSearchAttributes(attributes.GetSearchAttributes(), domainEntry.GetInfo().Name)
return v.searchAttributesValidator.ValidateSearchAttributes(attributes.GetSearchAttributes(), domainName)
}

func (v *decisionAttrValidator) validateStartChildExecutionAttributes(
Expand Down
6 changes: 2 additions & 4 deletions service/history/decisionTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,11 +748,10 @@ func (handler *decisionTaskHandlerImpl) handleDecisionContinueAsNewWorkflow(
var parentDomainName string
if handler.mutableState.HasParentExecution() {
parentDomainID := executionInfo.ParentDomainID
parentDomainEntry, err := handler.domainCache.GetDomainByID(parentDomainID)
parentDomainName, err = handler.domainCache.GetDomainName(parentDomainID)
if err != nil {
return err
}
parentDomainName = parentDomainEntry.GetInfo().Name
}

_, newStateBuilder, err := handler.mutableState.AddContinueAsNewEvent(
Expand Down Expand Up @@ -905,13 +904,12 @@ func (handler *decisionTaskHandlerImpl) handleDecisionUpsertWorkflowSearchAttrib
// get domain name
executionInfo := handler.mutableState.GetExecutionInfo()
domainID := executionInfo.DomainID
domainEntry, err := handler.domainCache.GetDomainByID(domainID)
domainName, err := handler.domainCache.GetDomainName(domainID)
if err != nil {
return &types.InternalServiceError{
Message: fmt.Sprintf("Unable to get domain for domainID: %v.", domainID),
}
}
domainName := domainEntry.GetInfo().Name

// valid search attributes for upsert
if err := handler.validateDecisionAttr(
Expand Down
Loading

0 comments on commit 3ae7451

Please sign in to comment.