Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable sanity check for strong idempotency check #6031

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2016,6 +2016,13 @@ const (
// Allowed filters: DomainName
EnableStrongIdempotency

// EnableStrongIdempotencySanityCheck enables sanity check for strong idempotency
// KeyName: history.enableStrongIdempotencySanityCheck
// Value type: Bool
// Default value: false
// Allowed filters: DomainName
EnableStrongIdempotencySanityCheck

// LastBoolKey must be the last one in this const group
LastBoolKey
)
Expand Down Expand Up @@ -4335,6 +4342,12 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "EnableStrongIdempotency enables strong idempotency for APIs",
DefaultValue: false,
},
EnableStrongIdempotencySanityCheck: DynamicBool{
KeyName: "history.enableStrongIdempotencySanityCheck",
Filters: []Filter{DomainName},
Description: "EnableStrongIdempotencySanityCheck enables sanity check for strong idempotency",
DefaultValue: false,
},
}

var FloatKeys = map[FloatKey]DynamicFloat{
Expand Down
6 changes: 4 additions & 2 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ type Config struct {
LargeShardHistoryEventMetricThreshold dynamicconfig.IntPropertyFn
LargeShardHistoryBlobMetricThreshold dynamicconfig.IntPropertyFn

EnableStrongIdempotency dynamicconfig.BoolPropertyFnWithDomainFilter
EnableStrongIdempotency dynamicconfig.BoolPropertyFnWithDomainFilter
EnableStrongIdempotencySanityCheck dynamicconfig.BoolPropertyFnWithDomainFilter

// HostName for machine running the service
HostName string
Expand Down Expand Up @@ -598,7 +599,8 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s
LargeShardHistoryEventMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryEventMetricThreshold),
LargeShardHistoryBlobMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryBlobMetricThreshold),

EnableStrongIdempotency: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableStrongIdempotency),
EnableStrongIdempotency: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableStrongIdempotency),
EnableStrongIdempotencySanityCheck: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableStrongIdempotencySanityCheck),

HostName: hostname,
}
Expand Down
72 changes: 43 additions & 29 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,15 +420,17 @@ func (c *contextImpl) CreateWorkflowExecution(
c.Clear()
}
}()
err := validateWorkflowRequestsAndMode(newWorkflow.WorkflowRequests, workflowRequestMode)
if err != nil {
// TODO(CDNC-8519): convert it to an error after verification in production
c.logger.Error("workflow requests and mode validation error", tag.Error(err))
}
domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
err := validateWorkflowRequestsAndMode(newWorkflow.WorkflowRequests, workflowRequestMode)
if err != nil {
if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) {
return err
}
c.logger.Error("workflow requests and mode validation error", tag.Error(err))
}
createRequest := &persistence.CreateWorkflowExecutionRequest{
// workflow create mode & prev run ID & version
Mode: createMode,
Expand Down Expand Up @@ -484,6 +486,10 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(
return err
}

domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
var persistedBlobs events.PersistedBlobs
resetHistorySize := c.GetHistorySize()
for _, workflowEvents := range resetWorkflowEventsSeq {
Expand Down Expand Up @@ -512,8 +518,10 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(
return err
}
if len(resetWorkflow.WorkflowRequests) != 0 && len(newWorkflow.WorkflowRequests) != 0 {
// TODO(CDNC-8519): convert it to an error after verification in production
c.logger.Error("Workflow reqeusts are only expected to be generated from one workflow for ConflictResolveWorkflowExecution")
if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) {
return &types.InternalServiceError{Message: "workflow requests are only expected to be generated from either reset workflow or continue-as-new workflow for ConflictResolveWorkflowExecution"}
}
c.logger.Error("workflow requests are only expected to be generated from either reset workflow or continue-as-new workflow for ConflictResolveWorkflowExecution", tag.Number(int64(len(resetWorkflow.WorkflowRequests))), tag.NextNumber(int64(len(newWorkflow.WorkflowRequests))))
}
newWorkflowSizeSize := newContext.GetHistorySize()
startEvents := newWorkflowEventsSeq[0]
Expand Down Expand Up @@ -542,8 +550,10 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(
return err
}
if len(currentWorkflow.WorkflowRequests) != 0 {
// TODO(CDNC-8519): convert it to an error after verification in production
c.logger.Error("workflow requests are not expected from current workflow for ConflictResolveWorkflowExecution")
if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) {
return &types.InternalServiceError{Message: "workflow requests are not expected from current workflow for ConflictResolveWorkflowExecution"}
}
c.logger.Error("workflow requests are not expected from current workflow for ConflictResolveWorkflowExecution", tag.Counter(len(currentWorkflow.WorkflowRequests)))
}
currentWorkflowSize := currentContext.GetHistorySize()
for _, workflowEvents := range currentWorkflowEventsSeq {
Expand All @@ -568,10 +578,6 @@ func (c *contextImpl) ConflictResolveWorkflowExecution(
); err != nil {
return err
}
domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
resp, err := c.shard.ConflictResolveWorkflowExecution(ctx, &persistence.ConflictResolveWorkflowExecutionRequest{
// RangeID , this is set by shard context
Mode: conflictResolveMode,
Expand Down Expand Up @@ -679,17 +685,19 @@ func (c *contextImpl) UpdateWorkflowExecutionTasks(
Message: "UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new history events",
}
}
domainName, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
if len(currentWorkflow.WorkflowRequests) != 0 {
// TODO(CDNC-8519): convert it to an error after verification in production
c.logger.Error("UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new workflow requests")
if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domainName) {
return &types.InternalServiceError{Message: "UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new workflow requests"}
}
c.logger.Error("UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new workflow requests", tag.Counter(len(currentWorkflow.WorkflowRequests)))
}
currentWorkflow.ExecutionStats = &persistence.ExecutionStats{
HistorySize: c.GetHistorySize(),
}
domainName, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
resp, err := c.updateWorkflowExecutionFn(ctx, &persistence.UpdateWorkflowExecutionRequest{
// RangeID , this is set by shard context
Mode: persistence.UpdateWorkflowModeIgnoreCurrent,
Expand Down Expand Up @@ -729,9 +737,15 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
if err != nil {
return err
}
domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
err = validateWorkflowRequestsAndMode(currentWorkflow.WorkflowRequests, workflowRequestMode)
if err != nil {
// TODO(CDNC-8519): convert it to an error after verification in production
if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) {
return err
}
c.logger.Error("workflow requests and mode validation error", tag.Error(err))
}
var persistedBlobs events.PersistedBlobs
Expand Down Expand Up @@ -769,13 +783,17 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
return err
}
if len(newWorkflow.WorkflowRequests) != 0 && len(currentWorkflow.WorkflowRequests) != 0 {
// TODO(CDNC-8519): convert it to an error after verification in production
c.logger.Error("Workflow reqeusts are only expected to be generated from one workflow for UpdateWorkflowExecution")
if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) {
return &types.InternalServiceError{Message: "workflow requests are only expected to be generated from one workflow for UpdateWorkflowExecution"}
}
c.logger.Error("workflow requests are only expected to be generated from one workflow for UpdateWorkflowExecution", tag.Number(int64(len(currentWorkflow.WorkflowRequests))), tag.NextNumber(int64(len(newWorkflow.WorkflowRequests))))
}

err := validateWorkflowRequestsAndMode(newWorkflow.WorkflowRequests, workflowRequestMode)
if err != nil {
// TODO(CDNC-8519): convert it to an error after verification in production
if c.shard.GetConfig().EnableStrongIdempotencySanityCheck(domain) {
return err
}
c.logger.Error("workflow requests and mode validation error", tag.Error(err))
}
newWorkflowSizeSize := newContext.GetHistorySize()
Expand Down Expand Up @@ -810,10 +828,6 @@ func (c *contextImpl) UpdateWorkflowExecutionWithNew(
if err := c.updateWorkflowExecutionEventReapplyFn(updateMode, currentWorkflowEventsSeq, newWorkflowEventsSeq); err != nil {
return err
}
domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
resp, err := c.updateWorkflowExecutionFn(ctx, &persistence.UpdateWorkflowExecutionRequest{
// RangeID , this is set by shard context
Mode: updateMode,
Expand Down Expand Up @@ -1412,14 +1426,14 @@ func validateWorkflowRequestsAndMode(requests []*persistence.WorkflowRequest, mo
return nil
}
if len(requests) > 2 {
return &types.InternalServiceError{Message: "Too many workflow requests for a single API request."}
return &types.InternalServiceError{Message: "too many workflow request entities generated from a single API request"}
} else if len(requests) == 2 {
// SignalWithStartWorkflow API can generate 2 workflow requests
if (requests[0].RequestType == persistence.WorkflowRequestTypeStart && requests[1].RequestType == persistence.WorkflowRequestTypeSignal) ||
(requests[1].RequestType == persistence.WorkflowRequestTypeStart && requests[0].RequestType == persistence.WorkflowRequestTypeSignal) {
return nil
}
return &types.InternalServiceError{Message: "Too many workflow requests for a single API request."}
return &types.InternalServiceError{Message: "too many workflow request entities generated from a single API request"}
}
return nil
}
Loading
Loading