Skip to content

Commit

Permalink
Remove history truncation (#1656)
Browse files Browse the repository at this point in the history
Remove history truncation
  • Loading branch information
Quinn-With-Two-Ns authored Oct 3, 2024
1 parent 4e8380c commit cdd3070
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 17 deletions.
48 changes: 48 additions & 0 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2688,3 +2688,51 @@ func TestResetIfDestroyedTaskPrep(t *testing.T) {
requireContainsMsgWithID(t, task.Messages, wftNewMsgID)
})
}

func TestHistoryIterator(t *testing.T) {
testEvents := []*historypb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: testWorkflowTaskTaskqueue}}),
createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}),
createTestEventWorkflowTaskStarted(3),
}

nextEvents := []*historypb.HistoryEvent{
createTestEventWorkflowTaskCompleted(4, &historypb.WorkflowTaskCompletedEventAttributes{}),
}

ctx := context.Background()
mockCtrl := gomock.NewController(t)
mockService := workflowservicemock.NewMockWorkflowServiceClient(mockCtrl)
mockService.EXPECT().GetWorkflowExecutionHistory(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.GetWorkflowExecutionHistoryResponse{
History: &historypb.History{
Events: testEvents,
},
NextPageToken: []byte("token"),
}, nil)

mockService.EXPECT().GetWorkflowExecutionHistory(gomock.Any(), gomock.Any(), gomock.Any()).Return(&workflowservice.GetWorkflowExecutionHistoryResponse{
History: &historypb.History{
Events: nextEvents,
},
}, nil)

historyIterator := &historyIteratorImpl{
iteratorFunc: newGetHistoryPageFunc(
ctx,
mockService,
"test-namespace",
&commonpb.WorkflowExecution{
WorkflowId: "test-workflow-id",
RunId: "test-run-id",
},
metrics.NopHandler,
"test-task-queue",
),
}

_, err := historyIterator.GetNextPage()
require.NoError(t, err)
_, err = historyIterator.GetNextPage()
require.NoError(t, err)

}
16 changes: 0 additions & 16 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ type (
nextPageToken []byte
namespace string
service workflowservice.WorkflowServiceClient
maxEventID int64
metricsHandler metrics.Handler
taskQueue string
}
Expand Down Expand Up @@ -870,7 +869,6 @@ func (wtp *workflowTaskPoller) toWorkflowTask(response *workflowservice.PollWork
nextPageToken: response.NextPageToken,
namespace: wtp.namespace,
service: wtp.service,
maxEventID: response.GetStartedEventId(),
metricsHandler: wtp.metricsHandler,
taskQueue: wtp.taskQueueName,
}
Expand All @@ -888,7 +886,6 @@ func (h *historyIteratorImpl) GetNextPage() (*historypb.History, error) {
h.service,
h.namespace,
h.execution,
h.maxEventID,
h.metricsHandler,
h.taskQueue,
)
Expand All @@ -915,7 +912,6 @@ func newGetHistoryPageFunc(
service workflowservice.WorkflowServiceClient,
namespace string,
execution *commonpb.WorkflowExecution,
atWorkflowTaskCompletedEventID int64,
metricsHandler metrics.Handler,
taskQueue string,
) func(nextPageToken []byte) (*historypb.History, []byte, error) {
Expand Down Expand Up @@ -945,18 +941,6 @@ func newGetHistoryPageFunc(
} else {
h = resp.History
}

size := len(h.Events)
if size > 0 && atWorkflowTaskCompletedEventID > 0 &&
h.Events[size-1].GetEventId() > atWorkflowTaskCompletedEventID {
first := h.Events[0].GetEventId() // eventIds start from 1
h.Events = h.Events[:atWorkflowTaskCompletedEventID-first+1]
if h.Events[len(h.Events)-1].GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
return nil, nil, fmt.Errorf("newGetHistoryPageFunc: atWorkflowTaskCompletedEventID(%v) "+
"points to event that is not WorkflowTaskCompleted", atWorkflowTaskCompletedEventID)
}
return h, nil, nil
}
return h, resp.NextPageToken, nil
}
}
Expand Down
1 change: 0 additions & 1 deletion internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1447,7 +1447,6 @@ func (aw *WorkflowReplayer) replayWorkflowHistory(logger log.Logger, service wor
execution: task.WorkflowExecution,
namespace: ReplayNamespace,
service: service,
maxEventID: task.GetStartedEventId(),
taskQueue: taskQueue,
}
cache := NewWorkerCache()
Expand Down

0 comments on commit cdd3070

Please sign in to comment.