Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix history paging on looking for WFT Fail #1147

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,19 @@ func (eh *history) IsReplayEvent(event *historypb.HistoryEvent) bool {
return event.GetEventId() <= eh.workflowTask.task.GetPreviousStartedEventId() || isCommandEvent(event.GetEventType())
}

// IsNextWorkflowTaskFailed checks if the workflow task failed or completed. If it did complete returns some information
// on the completed workflow task.
func (eh *history) IsNextWorkflowTaskFailed() (isFailed bool, binaryChecksum string, flags []sdkFlag, err error) {
nextIndex := eh.currentIndex + 1
if nextIndex >= len(eh.loadedEvents) && eh.hasMoreEvents() { // current page ends and there is more pages
// Server can return an empty page so if we need the next event we must keep checking until we either get it
// or know we have no more pages to check
for nextIndex >= len(eh.loadedEvents) && eh.hasMoreEvents() { // current page ends and there is more pages
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ug. One might expect such a loop to be in loadMoreEvents and loadMoreEvents have docs that says "This will not return until it has actually loaded more events or there are no more events". But I guess this way the history iterator is non-blocking.

if err := eh.loadMoreEvents(); err != nil {
return false, "", nil, err
}
}

// If not replaying we should not expect to find any more events
if nextIndex < len(eh.loadedEvents) {
nextEvent := eh.loadedEvents[nextIndex]
nextEventType := nextEvent.GetEventType()
Expand Down Expand Up @@ -393,14 +398,7 @@ OrderEvents:
if isPreloadMarkerEvent(event) {
markers = append(markers, event)
} else if attrs := event.GetWorkflowExecutionUpdateAcceptedEventAttributes(); attrs != nil {
msgs = append(msgs, &protocolpb.Message{
Id: attrs.GetAcceptedRequestMessageId(),
ProtocolInstanceId: attrs.GetProtocolInstanceId(),
SequencingId: &protocolpb.Message_EventId{
EventId: attrs.GetAcceptedRequestSequencingEventId(),
},
Body: protocol.MustMarshalAny(attrs.GetAcceptedRequest()),
})
msgs = append(msgs, inferMessage(attrs))
}
nextEvents = append(nextEvents, event)
}
Expand All @@ -425,6 +423,17 @@ func isPreloadMarkerEvent(event *historypb.HistoryEvent) bool {
return event.GetEventType() == enumspb.EVENT_TYPE_MARKER_RECORDED
}

func inferMessage(attrs *historypb.WorkflowExecutionUpdateAcceptedEventAttributes) *protocolpb.Message {
return &protocolpb.Message{
Id: attrs.GetAcceptedRequestMessageId(),
ProtocolInstanceId: attrs.GetProtocolInstanceId(),
SequencingId: &protocolpb.Message_EventId{
EventId: attrs.GetAcceptedRequestSequencingEventId(),
},
Body: protocol.MustMarshalAny(attrs.GetAcceptedRequest()),
}
}

// newWorkflowTaskHandler returns an implementation of workflow task handler.
func newWorkflowTaskHandler(params workerExecutionParameters, ppMgr pressurePointMgr, registry *registry) WorkflowTaskHandler {
ensureRequiredParams(&params)
Expand Down
119 changes: 119 additions & 0 deletions internal/internal_task_handlers_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/stretchr/testify/suite"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/protocol/v1"
"go.temporal.io/api/sdk/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
updatepb "go.temporal.io/api/update/v1"
Expand Down Expand Up @@ -308,3 +309,121 @@ func (s *PollLayerInterfacesTestSuite) TestMessageCommands() {

s.Equal(0, len(msgs))
}

func (s *PollLayerInterfacesTestSuite) TestEmptyPages() {
// Schedule an activity and see if we complete workflow.
taskQueue := "tq1"
testEvents := []*historypb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}),
createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}),
createTestEventWorkflowTaskStarted(3),
{
EventId: 4,
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT,
},
createTestEventWorkflowTaskScheduled(5, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}),
createTestEventWorkflowTaskStarted(6),
createTestEventWorkflowTaskCompleted(7, &historypb.WorkflowTaskCompletedEventAttributes{
ScheduledEventId: 5,
StartedEventId: 6,
}),
{
EventId: 8,
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED,
Attributes: &historypb.HistoryEvent_WorkflowExecutionUpdateAcceptedEventAttributes{
WorkflowExecutionUpdateAcceptedEventAttributes: &historypb.WorkflowExecutionUpdateAcceptedEventAttributes{
ProtocolInstanceId: "test",
AcceptedRequest: &updatepb.Request{},
},
},
},
createTestEventWorkflowTaskScheduled(9, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}),
createTestEventWorkflowTaskStarted(10),
}
task := createWorkflowTaskWithQueries(testEvents[0:2], 0, "HelloWorld_Workflow", nil, false)

returnEmptyPage := true
eventID := 2
historyIterator := MockHistoryIterator{
GetNextPageImpl: func() (*historypb.History, error) {
if returnEmptyPage {
returnEmptyPage = false
return &historypb.History{
Events: []*historypb.HistoryEvent{},
}, nil
}
returnEmptyPage = true
eventID += 1
return &historypb.History{
Events: testEvents[eventID-1 : eventID],
}, nil
},
HasNextPageImpl: func() bool {
return !(eventID >= len(testEvents) && returnEmptyPage == false)
},
}

workflowTask := &workflowTask{task: task, historyIterator: historyIterator}
eh := newHistory(workflowTask, nil)

type result struct {
events []*historypb.HistoryEvent
messages []*protocol.Message
}

expectedResults := []result{
{
events: []*historypb.HistoryEvent{
{
EventId: 1,
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
},
{
EventId: 6,
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED,
},
},
messages: []*protocol.Message{
{
ProtocolInstanceId: "test",
},
},
},
{
events: []*historypb.HistoryEvent{
{
EventId: 7,
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED,
},
{
EventId: 8,
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED,
},
{
EventId: 10,
EventType: enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED,
},
},
messages: []*protocol.Message{},
},
{
events: []*historypb.HistoryEvent{},
messages: []*protocol.Message{},
},
}

for _, expected := range expectedResults {
result, _, _, _, msgs, err := eh.NextCommandEvents()
s.NoError(err)
s.Equal(len(expected.events), len(result))
for i, event := range result {
s.Equal(expected.events[i].EventId, event.EventId)
s.Equal(expected.events[i].EventType, event.EventType)
}

s.Equal(len(expected.messages), len(msgs))
for i, msg := range msgs {
s.Equal(expected.messages[i].ProtocolInstanceId, msg.ProtocolInstanceId)
}
}
}