Skip to content

Commit

Permalink
Add ActivityScheduleTimeout deduction logic (#822)
Browse files Browse the repository at this point in the history
This commit relaxes validateActivityScheduleAttributes by having it fill in some
of the fields related to schedule timeouts if they are empty.

The logic used is identical to the logic used in AWS SWF's java client.

The logic is as follows:
If ScheduleToClose is present and ScheduleToStart or StartToClose is missing,
we will set what is missing to ScheduleToClose.

If ScheduleToClose is missing but both ScheduleToStart and StartToClose are present,
we will set ScheduleToClose to the sum of ScheduleToStart and StartToClose.
  • Loading branch information
arthurgan authored Jun 8, 2018
1 parent 7154288 commit dccf888
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 12 deletions.
36 changes: 24 additions & 12 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2373,18 +2373,6 @@ func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTas
return &workflow.BadRequestError{Message: "ActivityType is not set on decision."}
}

if attributes.StartToCloseTimeoutSeconds == nil || *attributes.StartToCloseTimeoutSeconds <= 0 {
return &workflow.BadRequestError{Message: "A valid StartToCloseTimeoutSeconds is not set on decision."}
}
if attributes.ScheduleToStartTimeoutSeconds == nil || *attributes.ScheduleToStartTimeoutSeconds <= 0 {
return &workflow.BadRequestError{Message: "A valid ScheduleToStartTimeoutSeconds is not set on decision."}
}
if attributes.ScheduleToCloseTimeoutSeconds == nil || *attributes.ScheduleToCloseTimeoutSeconds <= 0 {
return &workflow.BadRequestError{Message: "A valid ScheduleToCloseTimeoutSeconds is not set on decision."}
}
if attributes.HeartbeatTimeoutSeconds == nil || *attributes.HeartbeatTimeoutSeconds < 0 {
return &workflow.BadRequestError{Message: "A valid HeartbeatTimeoutSeconds is not set on decision."}
}
if policy := attributes.RetryPolicy; policy != nil {
if policy.GetInitialIntervalInSeconds() <= 0 {
return &workflow.BadRequestError{Message: "A valid InitialIntervalInSeconds is not set on retry policy."}
Expand All @@ -2397,6 +2385,30 @@ func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTas
}
}

// Only attempt to deduce and fill in unspecified timeouts only when all timeouts are non-negative.
if attributes.GetScheduleToCloseTimeoutSeconds() < 0 || attributes.GetScheduleToStartTimeoutSeconds() < 0 ||
attributes.GetStartToCloseTimeoutSeconds() < 0 || attributes.GetHeartbeatTimeoutSeconds() < 0 {
return &workflow.BadRequestError{Message: "A valid timeout may not be negative."}
}

validScheduleToClose := attributes.GetScheduleToCloseTimeoutSeconds() > 0
validScheduleToStart := attributes.GetScheduleToStartTimeoutSeconds() > 0
validStartToClose := attributes.GetStartToCloseTimeoutSeconds() > 0

if validScheduleToClose {
if !validScheduleToStart {
attributes.ScheduleToStartTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToCloseTimeoutSeconds())
}
if !validStartToClose {
attributes.StartToCloseTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToCloseTimeoutSeconds())
}
} else if validScheduleToStart && validStartToClose {
attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToStartTimeoutSeconds() + attributes.GetStartToCloseTimeoutSeconds())
} else {
// Deduction failed as there's not enough information to fill in missing timeouts.
return &workflow.BadRequestError{Message: "A valid ScheduleToCloseTimeout is not set on decision."}
}

return nil
}

Expand Down
124 changes: 124 additions & 0 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,130 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedBadDecisionAttributes() {
s.IsType(&workflow.BadRequestError{}, err)
}

// This test unit tests the activity schedule timeout validation logic of HistoryEngine's RespondDecisionTaskComplete function.
// An scheduled activity decision has 3 timeouts: ScheduleToClose, ScheduleToStart and StartToClose.
// This test verifies that when either ScheduleToClose or ScheduleToStart and StartToClose are specified,
// HistoryEngine's validateActivityScheduleAttribute will deduce the missing timeout and fill it in
// instead of returning a BadRequest error and only when all three are missing should a BadRequest be returned.
func (s *engineSuite) TestRespondDecisionTaskCompletedSingleActivityScheduledAttribute() {
testIterationVariables := []struct {
scheduleToClose *int32
scheduleToStart *int32
startToClose *int32
heartbeat *int32
expectedScheduleToClose int32
expectedScheduleToStart int32
expectedStartToClose int32
expectError bool
}{
// No ScheduleToClose timeout, will use ScheduleToStart + StartToClose
{nil, common.Int32Ptr(3), common.Int32Ptr(7), nil,
3 + 7, 3, 7, false},
// Has ScheduleToClose timeout but not ScheduleToStart or StartToClose,
// will use ScheduleToClose for ScheduleToStart and StartToClose
{common.Int32Ptr(7), nil, nil, nil,
7, 7, 7, false},
// No ScheduleToClose timeout, ScheduleToStart or StartToClose, expect error return
{nil, nil, nil, nil,
0, 0, 0, true},
// Negative ScheduleToClose, expect error return
{common.Int32Ptr(-1), nil, nil, nil,
0, 0, 0, true},
// Negative ScheduleToStart, expect error return
{nil, common.Int32Ptr(-1), nil, nil,
0, 0, 0, true},
// Negative StartToClose, expect error return
{nil, nil, common.Int32Ptr(-1), nil,
0, 0, 0, true},
// Negative HeartBeat, expect error return
{nil, nil, nil, common.Int32Ptr(-1),
0, 0, 0, true},
}

for _, iVar := range testIterationVariables {
domainID := validDomainID
we := workflow.WorkflowExecution{
WorkflowId: common.StringPtr("wId"),
RunId: common.StringPtr(validRunID),
}
tl := "testTaskList"
taskToken, _ := json.Marshal(&common.TaskToken{
WorkflowID: "wId",
RunID: we.GetRunId(),
ScheduleID: 2,
})
identity := "testIdentity"
executionContext := []byte("context")
input := []byte("input")

msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New()))
addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity)
di := addDecisionTaskScheduledEvent(msBuilder)
addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity)

decisions := []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask),
ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{
ActivityId: common.StringPtr("activity1"),
ActivityType: &workflow.ActivityType{Name: common.StringPtr("activity_type1")},
TaskList: &workflow.TaskList{Name: &tl},
Input: input,
ScheduleToCloseTimeoutSeconds: iVar.scheduleToClose,
ScheduleToStartTimeoutSeconds: iVar.scheduleToStart,
StartToCloseTimeoutSeconds: iVar.startToClose,
HeartbeatTimeoutSeconds: iVar.heartbeat,
},
}}

ms := createMutableState(msBuilder)
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}

s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()

if !iVar.expectError {
s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once()
}

s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&persistence.GetDomainResponse{
Info: &persistence.DomainInfo{ID: domainID},
Config: &persistence.DomainConfig{Retention: 1},
ReplicationConfig: &persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
&persistence.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName},
},
},
TableVersion: persistence.DomainTableVersionV1,
},
nil,
)
_, err := s.mockHistoryEngine.RespondDecisionTaskCompleted(context.Background(), &history.RespondDecisionTaskCompletedRequest{
DomainUUID: common.StringPtr(domainID),
CompleteRequest: &workflow.RespondDecisionTaskCompletedRequest{
TaskToken: taskToken,
Decisions: decisions,
ExecutionContext: executionContext,
Identity: &identity,
},
})

if !iVar.expectError {
s.Nil(err, s.printHistory(msBuilder))
executionBuilder := s.getBuilder(domainID, we)
activity1Attributes := s.getActivityScheduledEvent(executionBuilder, int64(5)).ActivityTaskScheduledEventAttributes
s.Equal(iVar.expectedScheduleToClose, activity1Attributes.GetScheduleToCloseTimeoutSeconds())
s.Equal(iVar.expectedScheduleToStart, activity1Attributes.GetScheduleToStartTimeoutSeconds())
s.Equal(iVar.expectedStartToClose, activity1Attributes.GetStartToCloseTimeoutSeconds())
} else {
s.NotNil(err)
}
s.TearDownTest()
s.SetupTest()
}
}

func (s *engineSuite) TestRespondDecisionTaskCompletedSingleActivityScheduledDecision() {
domainID := validDomainID
we := workflow.WorkflowExecution{
Expand Down

0 comments on commit dccf888

Please sign in to comment.