Skip to content

Commit

Permalink
fix issue with the transient decision state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
samarabbas committed Mar 31, 2018
1 parent 18f05fe commit afc11d7
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 93 deletions.
2 changes: 1 addition & 1 deletion service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (r *historyReplicator) ApplyEvents(request *h.ReplicateEventsRequest) error

case shared.EventTypeDecisionTaskStarted:
attributes := event.DecisionTaskStartedEventAttributes
di := msBuilder.ReplicateDecisionTaskStartedEvent(attributes.GetScheduledEventId(), event.GetEventId(),
di := msBuilder.ReplicateDecisionTaskStartedEvent(nil, attributes.GetScheduledEventId(), event.GetEventId(),
attributes.GetRequestId(), event.GetTimestamp())

decisionScheduleID = di.ScheduleID
Expand Down
188 changes: 96 additions & 92 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,21 @@ func (e *mutableStateBuilder) AddWorkflowExecutionStartedEventForContinueAsNew(d
return e.AddWorkflowExecutionStartedEvent(domainID, execution, createRequest)
}

func (e *mutableStateBuilder) AddWorkflowExecutionStartedEvent(domainID string, execution workflow.WorkflowExecution,
request *workflow.StartWorkflowExecutionRequest) *workflow.HistoryEvent {
eventID := e.GetNextEventID()
if eventID != firstEventID {
logging.LogInvalidHistoryActionEvent(e.logger, logging.TagValueActionWorkflowStarted, eventID, "")
return nil
}

event := e.hBuilder.AddWorkflowExecutionStartedEvent(request)
e.ReplicateWorkflowExecutionStartedEvent(domainID, execution, request.GetRequestId(),
event.WorkflowExecutionStartedEventAttributes)

return event
}

func (e *mutableStateBuilder) ReplicateWorkflowExecutionStartedEvent(domainID string,
execution workflow.WorkflowExecution, requestID string, event *workflow.WorkflowExecutionStartedEventAttributes) {
e.executionInfo.DomainID = domainID
Expand All @@ -903,36 +918,6 @@ func (e *mutableStateBuilder) ReplicateWorkflowExecutionStartedEvent(domainID st
e.executionInfo.DecisionTimeout = 0
}

func (e *mutableStateBuilder) AddWorkflowExecutionStartedEvent(domainID string, execution workflow.WorkflowExecution,
request *workflow.StartWorkflowExecutionRequest) *workflow.HistoryEvent {
eventID := e.GetNextEventID()
if eventID != firstEventID {
logging.LogInvalidHistoryActionEvent(e.logger, logging.TagValueActionWorkflowStarted, eventID, "")
return nil
}

event := e.hBuilder.AddWorkflowExecutionStartedEvent(request)
e.ReplicateWorkflowExecutionStartedEvent(domainID, execution, request.GetRequestId(),
event.WorkflowExecutionStartedEventAttributes)

return event
}

func (e *mutableStateBuilder) ReplicateDecisionTaskScheduledEvent(scheduleID int64, taskList string,
startToCloseTimeoutSeconds int32) *decisionInfo {
di := &decisionInfo{
ScheduleID: scheduleID,
StartedID: emptyEventID,
RequestID: emptyUUID,
DecisionTimeout: startToCloseTimeoutSeconds,
Tasklist: taskList,
Attempt: e.executionInfo.DecisionAttempt,
}

e.UpdateDecision(di)
return di
}

func (e *mutableStateBuilder) AddDecisionTaskScheduledEvent() *decisionInfo {
if e.HasPendingDecisionTask() {
logging.LogInvalidHistoryActionEvent(e.logger, logging.TagValueActionDecisionTaskScheduled, e.GetNextEventID(),
Expand Down Expand Up @@ -965,18 +950,15 @@ func (e *mutableStateBuilder) AddDecisionTaskScheduledEvent() *decisionInfo {
return e.ReplicateDecisionTaskScheduledEvent(scheduleID, taskList, startToCloseTimeoutSeconds)
}

func (e *mutableStateBuilder) ReplicateDecisionTaskStartedEvent(scheduleID, startedID int64, requestID string,
timestamp int64) *decisionInfo {
di, _ := e.GetPendingDecision(scheduleID)
e.executionInfo.State = persistence.WorkflowStateRunning
// Update mutable decision state
di = &decisionInfo{
func (e *mutableStateBuilder) ReplicateDecisionTaskScheduledEvent(scheduleID int64, taskList string,
startToCloseTimeoutSeconds int32) *decisionInfo {
di := &decisionInfo{
ScheduleID: scheduleID,
StartedID: startedID,
RequestID: requestID,
DecisionTimeout: di.DecisionTimeout,
Attempt: di.Attempt,
Timestamp: timestamp,
StartedID: emptyEventID,
RequestID: emptyUUID,
DecisionTimeout: startToCloseTimeoutSeconds,
Tasklist: taskList,
Attempt: e.executionInfo.DecisionAttempt,
}

e.UpdateDecision(di)
Expand Down Expand Up @@ -1014,10 +996,33 @@ func (e *mutableStateBuilder) AddDecisionTaskStartedEvent(scheduleEventID int64,
timestamp = int64(0)
}

di = e.ReplicateDecisionTaskStartedEvent(scheduleID, startedID, requestID, timestamp)
di = e.ReplicateDecisionTaskStartedEvent(di, scheduleID, startedID, requestID, timestamp)
return event, di
}

func (e *mutableStateBuilder) ReplicateDecisionTaskStartedEvent(di *decisionInfo, scheduleID, startedID int64,
requestID string, timestamp int64) *decisionInfo {
// Replicator calls it with a nil decision info, and it is safe to always lookup the decision in this case as it
// does not have to deal with transient decision case.
if di == nil {
di, _ = e.GetPendingDecision(scheduleID)
}

e.executionInfo.State = persistence.WorkflowStateRunning
// Update mutable decision state
di = &decisionInfo{
ScheduleID: scheduleID,
StartedID: startedID,
RequestID: requestID,
DecisionTimeout: di.DecisionTimeout,
Attempt: di.Attempt,
Timestamp: timestamp,
}

e.UpdateDecision(di)
return di
}

func (e *mutableStateBuilder) createTransientDecisionEvents(di *decisionInfo, identity string) (*workflow.HistoryEvent,
*workflow.HistoryEvent) {
tasklist := e.executionInfo.TaskList
Expand All @@ -1029,12 +1034,6 @@ func (e *mutableStateBuilder) createTransientDecisionEvents(di *decisionInfo, id
return scheduledEvent, startedEvent
}

func (e *mutableStateBuilder) ReplicateDecisionTaskCompletedEvent(scheduleEventID,
startedEventID int64) {
e.BeforeAddDecisionTaskCompletedEvent()
e.AfterAddDecisionTaskCompletedEvent(startedEventID)
}

func (e *mutableStateBuilder) BeforeAddDecisionTaskCompletedEvent() {
// Make sure to delete decision before adding events. Otherwise they are buffered rather than getting appended
e.DeleteDecision()
Expand Down Expand Up @@ -1070,8 +1069,9 @@ func (e *mutableStateBuilder) AddDecisionTaskCompletedEvent(scheduleEventID, sta
return event
}

func (e *mutableStateBuilder) ReplicateDecisionTaskTimedOutEvent(scheduleID, startedID int64) {
e.FailDecision()
func (e *mutableStateBuilder) ReplicateDecisionTaskCompletedEvent(scheduleEventID, startedEventID int64) {
e.BeforeAddDecisionTaskCompletedEvent()
e.AfterAddDecisionTaskCompletedEvent(startedEventID)
}

func (e *mutableStateBuilder) AddDecisionTaskTimedOutEvent(scheduleEventID int64,
Expand All @@ -1095,6 +1095,10 @@ func (e *mutableStateBuilder) AddDecisionTaskTimedOutEvent(scheduleEventID int64
return event
}

func (e *mutableStateBuilder) ReplicateDecisionTaskTimedOutEvent(scheduleID, startedID int64) {
e.FailDecision()
}

func (e *mutableStateBuilder) AddDecisionTaskScheduleToStartTimeoutEvent(scheduleEventID int64) *workflow.HistoryEvent {
if e.executionInfo.DecisionScheduleID != scheduleEventID || e.executionInfo.DecisionStartedID > 0 {
logging.LogInvalidHistoryActionEvent(e.logger, logging.TagValueActionDecisionTaskTimedOut, e.GetNextEventID(),
Expand All @@ -1112,10 +1116,6 @@ func (e *mutableStateBuilder) AddDecisionTaskScheduleToStartTimeoutEvent(schedul
return event
}

func (e *mutableStateBuilder) ReplicateDecisionTaskFailedEvent(scheduleID, startedID int64) {
e.FailDecision()
}

func (e *mutableStateBuilder) AddDecisionTaskFailedEvent(scheduleEventID int64,
startedEventID int64, cause workflow.DecisionTaskFailedCause, details []byte,
identity string) *workflow.HistoryEvent {
Expand All @@ -1138,6 +1138,28 @@ func (e *mutableStateBuilder) AddDecisionTaskFailedEvent(scheduleEventID int64,
return event
}

func (e *mutableStateBuilder) ReplicateDecisionTaskFailedEvent(scheduleID, startedID int64) {
e.FailDecision()
}

func (e *mutableStateBuilder) AddActivityTaskScheduledEvent(decisionCompletedEventID int64,
attributes *workflow.ScheduleActivityTaskDecisionAttributes) (*workflow.HistoryEvent, *persistence.ActivityInfo) {
if ai, ok := e.GetActivityInfo(e.GetNextEventID()); ok {
logging.LogInvalidHistoryActionEvent(e.logger, logging.TagValueActionActivityTaskScheduled, ai.ScheduleID, fmt.Sprintf(
"{Exist: %v, Value: %v}", ok, ai.StartedID))
return nil, nil
}

if attributes.ActivityId == nil {
return nil, nil
}

event := e.hBuilder.AddActivityTaskScheduledEvent(decisionCompletedEventID, attributes)

ai := e.ReplicateActivityTaskScheduledEvent(event)
return event, ai
}

func (e *mutableStateBuilder) ReplicateActivityTaskScheduledEvent(
event *workflow.HistoryEvent) *persistence.ActivityInfo {
attributes := event.ActivityTaskScheduledEventAttributes
Expand Down Expand Up @@ -1197,35 +1219,6 @@ func (e *mutableStateBuilder) ReplicateActivityTaskScheduledEvent(
return ai
}

func (e *mutableStateBuilder) AddActivityTaskScheduledEvent(decisionCompletedEventID int64,
attributes *workflow.ScheduleActivityTaskDecisionAttributes) (*workflow.HistoryEvent, *persistence.ActivityInfo) {
if ai, ok := e.GetActivityInfo(e.GetNextEventID()); ok {
logging.LogInvalidHistoryActionEvent(e.logger, logging.TagValueActionActivityTaskScheduled, ai.ScheduleID, fmt.Sprintf(
"{Exist: %v, Value: %v}", ok, ai.StartedID))
return nil, nil
}

if attributes.ActivityId == nil {
return nil, nil
}

event := e.hBuilder.AddActivityTaskScheduledEvent(decisionCompletedEventID, attributes)

ai := e.ReplicateActivityTaskScheduledEvent(event)
return event, ai
}

func (e *mutableStateBuilder) ReplicateActivityTaskStartedEvent(event *workflow.HistoryEvent) {
attributes := event.ActivityTaskStartedEventAttributes
scheduleID := attributes.GetScheduledEventId()
ai, _ := e.GetActivityInfo(scheduleID)

ai.StartedID = event.GetEventId()
ai.RequestID = attributes.GetRequestId()
ai.StartedTime = time.Unix(0, event.GetTimestamp())
e.updateActivityInfos = append(e.updateActivityInfos, ai)
}

func (e *mutableStateBuilder) AddActivityTaskStartedEvent(ai *persistence.ActivityInfo, scheduleEventID int64,
requestID string, request *workflow.PollForActivityTaskRequest) *workflow.HistoryEvent {
if ai, ok := e.GetActivityInfo(scheduleEventID); !ok || ai.StartedID != emptyEventID {
Expand All @@ -1240,11 +1233,15 @@ func (e *mutableStateBuilder) AddActivityTaskStartedEvent(ai *persistence.Activi
return event
}

func (e *mutableStateBuilder) ReplicateActivityTaskCompletedEvent(event *workflow.HistoryEvent) error {
attributes := event.ActivityTaskCompletedEventAttributes
func (e *mutableStateBuilder) ReplicateActivityTaskStartedEvent(event *workflow.HistoryEvent) {
attributes := event.ActivityTaskStartedEventAttributes
scheduleID := attributes.GetScheduledEventId()
ai, _ := e.GetActivityInfo(scheduleID)

return e.DeleteActivity(scheduleID)
ai.StartedID = event.GetEventId()
ai.RequestID = attributes.GetRequestId()
ai.StartedTime = time.Unix(0, event.GetTimestamp())
e.updateActivityInfos = append(e.updateActivityInfos, ai)
}

func (e *mutableStateBuilder) AddActivityTaskCompletedEvent(scheduleEventID, startedEventID int64,
Expand All @@ -1263,6 +1260,13 @@ func (e *mutableStateBuilder) AddActivityTaskCompletedEvent(scheduleEventID, sta
return event
}

func (e *mutableStateBuilder) ReplicateActivityTaskCompletedEvent(event *workflow.HistoryEvent) error {
attributes := event.ActivityTaskCompletedEventAttributes
scheduleID := attributes.GetScheduledEventId()

return e.DeleteActivity(scheduleID)
}

func (e *mutableStateBuilder) AddActivityTaskFailedEvent(scheduleEventID, startedEventID int64,
request *workflow.RespondActivityTaskFailedRequest) *workflow.HistoryEvent {
if ai, ok := e.GetActivityInfo(scheduleEventID); !ok || ai.StartedID != startedEventID {
Expand Down Expand Up @@ -1347,12 +1351,6 @@ func (e *mutableStateBuilder) AddActivityTaskCanceledEvent(scheduleEventID, star
details, identity)
}

func (e *mutableStateBuilder) ReplicateWorkflowExecutionCompletedEvent(event *workflow.HistoryEvent) {
e.executionInfo.State = persistence.WorkflowStateCompleted
e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusCompleted
e.writeCompletionEventToMutableState(event)
}

func (e *mutableStateBuilder) AddCompletedWorkflowEvent(decisionCompletedEventID int64,
attributes *workflow.CompleteWorkflowExecutionDecisionAttributes) *workflow.HistoryEvent {
if e.executionInfo.State == persistence.WorkflowStateCompleted {
Expand All @@ -1366,6 +1364,12 @@ func (e *mutableStateBuilder) AddCompletedWorkflowEvent(decisionCompletedEventID
return event
}

func (e *mutableStateBuilder) ReplicateWorkflowExecutionCompletedEvent(event *workflow.HistoryEvent) {
e.executionInfo.State = persistence.WorkflowStateCompleted
e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusCompleted
e.writeCompletionEventToMutableState(event)
}

func (e *mutableStateBuilder) AddFailWorkflowEvent(decisionCompletedEventID int64,
attributes *workflow.FailWorkflowExecutionDecisionAttributes) *workflow.HistoryEvent {
if e.executionInfo.State == persistence.WorkflowStateCompleted {
Expand Down

0 comments on commit afc11d7

Please sign in to comment.