Skip to content

Commit

Permalink
Add speculative workflow task
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Dec 30, 2022
1 parent adf7c54 commit 2d402a5
Show file tree
Hide file tree
Showing 17 changed files with 411 additions and 276 deletions.
538 changes: 294 additions & 244 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,10 @@ func Attempt(attempt int32) ZapTag {
return NewInt32("attempt", attempt)
}

func WorkflowTaskSpeculative(speculative bool) ZapTag {
return NewBoolTag("wt-speculative", speculative)
}

// AttemptCount returns tag for AttemptCount
func AttemptCount(attemptCount int64) ZapTag {
return NewInt64("attempt-count", attemptCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,20 @@ message WorkflowExecutionInfo {
int64 last_workflow_task_started_event_id = 19;
google.protobuf.Timestamp start_time = 20 [(gogoproto.stdtime) = true];
google.protobuf.Timestamp last_update_time = 21 [(gogoproto.stdtime) = true];

// Workflow task fields.
int64 workflow_task_version = 22;
int64 workflow_task_scheduled_event_id = 23;
int64 workflow_task_started_event_id = 24;
google.protobuf.Duration workflow_task_timeout = 25 [(gogoproto.stdduration) = true];
int32 workflow_task_attempt = 26;
google.protobuf.Timestamp workflow_task_started_time = 27 [(gogoproto.stdtime) = true];
google.protobuf.Timestamp workflow_task_scheduled_time = 28 [(gogoproto.stdtime) = true];
bool cancel_requested = 29;
google.protobuf.Timestamp workflow_task_original_scheduled_time = 30 [(gogoproto.stdtime) = true];
string workflow_task_request_id = 31;
bool workflow_task_speculative = 68;

bool cancel_requested = 29;
string cancel_request_id = 32;
string sticky_task_queue = 33;
// (-- api-linter: core::0140::prepositions=disabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func signalWorkflow(

// Create a transfer task to schedule a workflow task
if !mutableState.HasPendingWorkflowTask() {
_, err := mutableState.AddWorkflowTaskScheduledEvent(false)
_, err := mutableState.AddWorkflowTaskScheduledEvent(false, false)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NewWorkflowTask() {
request.GetHeader(),
).Return(&history.HistoryEvent{}, nil)
s.currentMutableState.EXPECT().HasPendingWorkflowTask().Return(false)
s.currentMutableState.EXPECT().AddWorkflowTaskScheduledEvent(false).Return(&workflow.WorkflowTaskInfo{}, nil)
s.currentMutableState.EXPECT().AddWorkflowTaskScheduledEvent(false, false).Return(&workflow.WorkflowTaskInfo{}, nil)
s.currentContext.EXPECT().UpdateWorkflowExecutionAsActive(ctx, gomock.Any()).Return(nil)

err := signalWorkflow(
Expand Down
1 change: 1 addition & 0 deletions service/history/api/update_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func UpdateWorkflowWithNew(
if !mutableState.HasPendingWorkflowTask() {
if _, err := mutableState.AddWorkflowTaskScheduledEvent(
false,
false,
); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5120,7 +5120,7 @@ func addWorkflowExecutionStartedEvent(ms workflow.MutableState, workflowExecutio
}

func addWorkflowTaskScheduledEvent(ms workflow.MutableState) *workflow.WorkflowTaskInfo {
workflowTask, _ := ms.AddWorkflowTaskScheduledEvent(false)
workflowTask, _ := ms.AddWorkflowTaskScheduledEvent(false, false)
return workflowTask
}

Expand Down
1 change: 1 addition & 0 deletions service/history/ndc/events_reapplier.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (r *EventsReapplierImpl) ReapplyEvents(
if !ms.HasPendingWorkflowTask() {
if _, err := ms.AddWorkflowTaskScheduledEvent(
false,
false,
); err != nil {
return nil, err
}
Expand Down
9 changes: 6 additions & 3 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type (
// In this case, OriginalScheduledTime won't change. Then when time.Now().UTC()-OriginalScheduledTime exceeds
// some threshold, server can interrupt the heartbeat by enforcing to time out the workflow task.
OriginalScheduledTime *time.Time

// Indicate if the current workflow task (if any) is speculative.
IsSpeculative bool
}

MutableState interface {
Expand All @@ -107,8 +110,8 @@ type (
AddWorkflowTaskFailedEvent(scheduledEventID int64, startedEventID int64, cause enumspb.WorkflowTaskFailedCause, failure *failurepb.Failure, identity, binChecksum, baseRunID, newRunID string, forkEventVersion int64) (*historypb.HistoryEvent, error)
AddWorkflowTaskScheduleToStartTimeoutEvent(int64) (*historypb.HistoryEvent, error)
AddFirstWorkflowTaskScheduled(*historypb.HistoryEvent) error
AddWorkflowTaskScheduledEvent(bypassTaskGeneration bool) (*WorkflowTaskInfo, error)
AddWorkflowTaskScheduledEventAsHeartbeat(bypassTaskGeneration bool, originalScheduledTimestamp *time.Time) (*WorkflowTaskInfo, error)
AddWorkflowTaskScheduledEvent(bypassTaskGeneration bool, isSpeculativeWorkflowTask bool) (*WorkflowTaskInfo, error)
AddWorkflowTaskScheduledEventAsHeartbeat(bypassTaskGeneration bool, originalScheduledTimestamp *time.Time, isSpeculativeWorkflowTask bool) (*WorkflowTaskInfo, error)
AddWorkflowTaskStartedEvent(int64, string, *taskqueuepb.TaskQueue, string) (*historypb.HistoryEvent, *WorkflowTaskInfo, error)
AddWorkflowTaskTimedOutEvent(int64, int64) (*historypb.HistoryEvent, error)
AddExternalWorkflowExecutionCancelRequested(int64, namespace.Name, namespace.ID, string, string) (*historypb.HistoryEvent, error)
Expand Down Expand Up @@ -214,7 +217,7 @@ type (
ReplicateChildWorkflowExecutionTimedOutEvent(*historypb.HistoryEvent) error
ReplicateWorkflowTaskCompletedEvent(*historypb.HistoryEvent) error
ReplicateWorkflowTaskFailedEvent() error
ReplicateWorkflowTaskScheduledEvent(int64, int64, *taskqueuepb.TaskQueue, *time.Duration, int32, *time.Time, *time.Time) (*WorkflowTaskInfo, error)
ReplicateWorkflowTaskScheduledEvent(int64, int64, *taskqueuepb.TaskQueue, *time.Duration, int32, *time.Time, *time.Time, bool) (*WorkflowTaskInfo, error)
ReplicateWorkflowTaskStartedEvent(*WorkflowTaskInfo, int64, int64, int64, string, time.Time) (*WorkflowTaskInfo, error)
ReplicateWorkflowTaskTimedOutEvent(enumspb.TimeoutType) error
ReplicateExternalWorkflowExecutionCancelRequested(*historypb.HistoryEvent) error
Expand Down
10 changes: 7 additions & 3 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,6 +1322,7 @@ func (ms *MutableStateImpl) ClearTransientWorkflowTask() error {

TaskQueue: nil,
OriginalScheduledTime: timestamp.UnixOrZeroTimePtr(0),
IsSpeculative: false,
}
ms.workflowTaskManager.UpdateWorkflowTask(emptyWorkflowTaskInfo)
return nil
Expand Down Expand Up @@ -1719,24 +1720,26 @@ func (ms *MutableStateImpl) AddFirstWorkflowTaskScheduled(

func (ms *MutableStateImpl) AddWorkflowTaskScheduledEvent(
bypassTaskGeneration bool,
isSpeculative bool,
) (*WorkflowTaskInfo, error) {
opTag := tag.WorkflowActionWorkflowTaskScheduled
if err := ms.checkMutability(opTag); err != nil {
return nil, err
}
return ms.workflowTaskManager.AddWorkflowTaskScheduledEvent(bypassTaskGeneration)
return ms.workflowTaskManager.AddWorkflowTaskScheduledEvent(bypassTaskGeneration, isSpeculative)
}

// AddWorkflowTaskScheduledEventAsHeartbeat is to record the first WorkflowTaskScheduledEvent during workflow task heartbeat.
func (ms *MutableStateImpl) AddWorkflowTaskScheduledEventAsHeartbeat(
bypassTaskGeneration bool,
originalScheduledTimestamp *time.Time,
isSpeculative bool,
) (*WorkflowTaskInfo, error) {
opTag := tag.WorkflowActionWorkflowTaskScheduled
if err := ms.checkMutability(opTag); err != nil {
return nil, err
}
return ms.workflowTaskManager.AddWorkflowTaskScheduledEventAsHeartbeat(bypassTaskGeneration, originalScheduledTimestamp)
return ms.workflowTaskManager.AddWorkflowTaskScheduledEventAsHeartbeat(bypassTaskGeneration, originalScheduledTimestamp, isSpeculative)
}

func (ms *MutableStateImpl) ReplicateTransientWorkflowTaskScheduled() (*WorkflowTaskInfo, error) {
Expand All @@ -1751,8 +1754,9 @@ func (ms *MutableStateImpl) ReplicateWorkflowTaskScheduledEvent(
attempt int32,
scheduleTimestamp *time.Time,
originalScheduledTimestamp *time.Time,
isSpeculative bool,
) (*WorkflowTaskInfo, error) {
return ms.workflowTaskManager.ReplicateWorkflowTaskScheduledEvent(version, scheduledEventID, taskQueue, startToCloseTimeout, attempt, scheduleTimestamp, originalScheduledTimestamp)
return ms.workflowTaskManager.ReplicateWorkflowTaskScheduledEvent(version, scheduledEventID, taskQueue, startToCloseTimeout, attempt, scheduleTimestamp, originalScheduledTimestamp, isSpeculative)
}

func (ms *MutableStateImpl) AddWorkflowTaskStartedEvent(
Expand Down
6 changes: 4 additions & 2 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskSchedule_CurrentVersionChan
})
s.NoError(err)

wt, err := s.mutableState.AddWorkflowTaskScheduledEventAsHeartbeat(true, timestamp.TimeNowPtrUtc())
wt, err := s.mutableState.AddWorkflowTaskScheduledEventAsHeartbeat(true, timestamp.TimeNowPtrUtc(), false)
s.NoError(err)
s.NotNil(wt)

Expand Down Expand Up @@ -437,7 +437,7 @@ func (s *mutableStateSuite) TestTransientWorkflowTaskStart_CurrentVersionChanged
})
s.NoError(err)

wt, err := s.mutableState.AddWorkflowTaskScheduledEventAsHeartbeat(true, timestamp.TimeNowPtrUtc())
wt, err := s.mutableState.AddWorkflowTaskScheduledEventAsHeartbeat(true, timestamp.TimeNowPtrUtc(), false)
s.NoError(err)
s.NotNil(wt)

Expand Down Expand Up @@ -583,6 +583,7 @@ func (s *mutableStateSuite) prepareTransientWorkflowTaskCompletionFirstBatchRepl
workflowTaskScheduleEvent.GetWorkflowTaskScheduledEventAttributes().GetAttempt(),
nil,
nil,
false,
)
s.Nil(err)
s.NotNil(wt)
Expand Down Expand Up @@ -634,6 +635,7 @@ func (s *mutableStateSuite) prepareTransientWorkflowTaskCompletionFirstBatchRepl
newWorkflowTaskScheduleEvent.GetWorkflowTaskScheduledEventAttributes().GetAttempt(),
nil,
nil,
false,
)
s.Nil(err)
s.NotNil(wt)
Expand Down
24 changes: 12 additions & 12 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions service/history/workflow/mutable_state_rebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (b *MutableStateRebuilderImpl) ApplyEvents(
attributes.GetAttempt(),
event.GetEventTime(),
event.GetEventTime(),
false, // speculative workflow tasks are not replicated.
)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion service/history/workflow/mutable_state_rebuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,10 +756,11 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowTaskScheduled() {
WorkflowTaskTimeout: &timeout,
TaskQueue: taskqueue,
Attempt: workflowTaskAttempt,
IsSpeculative: false,
}
s.executionInfo.TaskQueue = taskqueue.GetName()
s.mockMutableState.EXPECT().ReplicateWorkflowTaskScheduledEvent(
event.GetVersion(), event.GetEventId(), taskqueue, &timeout, workflowTaskAttempt, event.GetEventTime(), event.GetEventTime(),
event.GetVersion(), event.GetEventId(), taskqueue, &timeout, workflowTaskAttempt, event.GetEventTime(), event.GetEventTime(), false,
).Return(wt, nil)
s.mockUpdateVersion(event)
s.mockTaskGenerator.EXPECT().GenerateScheduleWorkflowTaskTasks(
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflow/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func ScheduleWorkflowTask(
return nil
}

_, err := mutableState.AddWorkflowTaskScheduledEvent(false)
_, err := mutableState.AddWorkflowTaskScheduledEvent(false, false)
if err != nil {
return serviceerror.NewInternal("Failed to add workflow task scheduled event.")
}
Expand Down
Loading

0 comments on commit 2d402a5

Please sign in to comment.