Skip to content

Commit

Permalink
Handle workflow not exist error when archiving history (#4181)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed May 7, 2021
1 parent 667b7c6 commit cd40ea3
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 2 deletions.
2 changes: 2 additions & 0 deletions common/archiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type HistoryArchiver interface {
// to interact with these retries including giving the implementor the ability to cancel retries and record progress
// between retry attempts.
// This method will be invoked after a workflow passes its retention period.
// It's possible that this method will be invoked for one workflow multiple times and potentially concurrently,
// implementation should correctly handle the workflow not exist case and return nil error.
Archive(context.Context, URI, *ArchiveHistoryRequest, ...ArchiveOption) error

// Get is used to access an archived history. When context expires method should stop trying to fetch history.
Expand Down
2 changes: 2 additions & 0 deletions common/archiver/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const (
ArchiveNonRetriableErrorMsg = "Archive method encountered an non-retriable error."
// ArchiveTransientErrorMsg is the log message when the Archive() method encounters a transient error
ArchiveTransientErrorMsg = "Archive method encountered a transient error."
// ArchiveSkippedInfoMsg is the log messsage when the Archive() method encounter an entity not exists error
ArchiveSkippedInfoMsg = "Archive method encountered entity not exists error and skipped the archival"

// ErrReasonInvalidURI is the error reason for invalid URI
ErrReasonInvalidURI = "URI is invalid"
Expand Down
8 changes: 8 additions & 0 deletions common/archiver/filestore/historyArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ func (h *historyArchiver) Archive(
for historyIterator.HasNext() {
historyBlob, err := getNextHistoryBlob(ctx, historyIterator)
if err != nil {
if common.IsEntityNotExistsError(err) {
// workflow history no longer exists, may due to duplicated archival signal
// this may happen even in the middle of iterating history as two archival signals
// can be processed concurrently.
logger.Info(archiver.ArchiveSkippedInfoMsg)
return nil
}

logger := logger.WithTags(tag.ArchivalArchiveFailReason(archiver.ErrReasonReadHistory), tag.Error(err))
if !persistence.IsTransientError(err) {
logger.Error(archiver.ArchiveNonRetriableErrorMsg)
Expand Down
41 changes: 41 additions & 0 deletions common/archiver/filestore/historyArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,47 @@ func (s *historyArchiverSuite) TestArchive_Fail_NonRetriableErrorOption() {
s.Equal(nonRetryableErr, err)
}

func (s *historyArchiverSuite) TestArchive_Skip() {
mockCtrl := gomock.NewController(s.T())
defer mockCtrl.Finish()
historyIterator := archiver.NewMockHistoryIterator(mockCtrl)
historyBlob := &archiver.HistoryBlob{
Header: &archiver.HistoryBlobHeader{
IsLast: common.BoolPtr(false),
},
Body: []*types.History{
{
Events: []*types.HistoryEvent{
{
EventID: common.FirstEventID,
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Version: testCloseFailoverVersion,
},
},
},
},
}
gomock.InOrder(
historyIterator.EXPECT().HasNext().Return(true),
historyIterator.EXPECT().Next().Return(historyBlob, nil),
historyIterator.EXPECT().HasNext().Return(true),
historyIterator.EXPECT().Next().Return(nil, &types.EntityNotExistsError{Message: "workflow not found"}),
)

historyArchiver := s.newTestHistoryArchiver(historyIterator)
request := &archiver.ArchiveHistoryRequest{
DomainID: testDomainID,
DomainName: testDomainName,
WorkflowID: testWorkflowID,
RunID: testRunID,
BranchToken: testBranchToken,
NextEventID: testNextEventID,
CloseFailoverVersion: testCloseFailoverVersion,
}
err := historyArchiver.Archive(context.Background(), s.testArchivalURI, request)
s.NoError(err)
}

func (s *historyArchiverSuite) TestArchive_Success() {
mockCtrl := gomock.NewController(s.T())
defer mockCtrl.Finish()
Expand Down
9 changes: 9 additions & 0 deletions common/archiver/gcloud/historyArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,15 @@ func (h *historyArchiver) Archive(ctx context.Context, URI archiver.URI, request
historyBlob, err := getNextHistoryBlob(ctx, historyIterator)

if err != nil {
if common.IsEntityNotExistsError(err) {
// workflow history no longer exists, may due to duplicated archival signal
// this may happen even in the middle of iterating history as two archival signals
// can be processed concurrently.
logger.Info(archiver.ArchiveSkippedInfoMsg)
scope.IncCounter(metrics.HistoryArchiverDuplicateArchivalsCount)
return nil
}

logger := logger.WithTags(tag.ArchivalArchiveFailReason(archiver.ErrReasonReadHistory), tag.Error(err))
if !persistence.IsTransientError(err) {
logger.Error(archiver.ArchiveNonRetriableErrorMsg)
Expand Down
48 changes: 48 additions & 0 deletions common/archiver/gcloud/historyArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,54 @@ func (h *historyArchiverSuite) TestArchive_Fail_NonRetriableErrorOption() {
h.Equal(errUploadNonRetriable, err)
}

func (h *historyArchiverSuite) TestArchive_Skip() {
ctx := context.Background()
mockCtrl := gomock.NewController(h.T())
URI, err := archiver.NewURI("gs://my-bucket-cad/cadence_archival/development")
h.NoError(err)
storageWrapper := &mocks.Client{}
storageWrapper.On("Exist", ctx, URI, mock.Anything).Return(true, nil)
storageWrapper.On("Upload", ctx, URI, mock.Anything, mock.Anything).Return(nil)

defer mockCtrl.Finish()
historyIterator := archiver.NewMockHistoryIterator(mockCtrl)
historyBlob := &archiver.HistoryBlob{
Header: &archiver.HistoryBlobHeader{
IsLast: common.BoolPtr(false),
},
Body: []*types.History{
{
Events: []*types.HistoryEvent{
{
EventID: common.FirstEventID,
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Version: testCloseFailoverVersion,
},
},
},
},
}
gomock.InOrder(
historyIterator.EXPECT().HasNext().Return(true),
historyIterator.EXPECT().Next().Return(historyBlob, nil),
historyIterator.EXPECT().HasNext().Return(true),
historyIterator.EXPECT().Next().Return(nil, &types.EntityNotExistsError{Message: "workflow not found"}),
)

historyArchiver := newHistoryArchiver(h.container, historyIterator, storageWrapper)
request := &archiver.ArchiveHistoryRequest{
DomainID: testDomainID,
DomainName: testDomainName,
WorkflowID: testWorkflowID,
RunID: testRunID,
BranchToken: testBranchToken,
NextEventID: testNextEventID,
CloseFailoverVersion: testCloseFailoverVersion,
}
err = historyArchiver.Archive(ctx, h.testArchivalURI, request)
h.NoError(err)
}

func (h *historyArchiverSuite) TestArchive_Success() {

ctx := context.Background()
Expand Down
9 changes: 9 additions & 0 deletions common/archiver/s3store/historyArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,15 @@ func (h *historyArchiver) Archive(
for historyIterator.HasNext() {
historyBlob, err := getNextHistoryBlob(ctx, historyIterator)
if err != nil {
if common.IsEntityNotExistsError(err) {
// workflow history no longer exists, may due to duplicated archival signal
// this may happen even in the middle of iterating history as two archival signals
// can be processed concurrently.
logger.Info(archiver.ArchiveSkippedInfoMsg)
scope.IncCounter(metrics.HistoryArchiverDuplicateArchivalsCount)
return nil
}

logger := logger.WithTags(tag.ArchivalArchiveFailReason(archiver.ErrReasonReadHistory), tag.Error(err))
if persistence.IsTransientError(err) {
logger.Error(archiver.ArchiveTransientErrorMsg)
Expand Down
46 changes: 46 additions & 0 deletions common/archiver/s3store/historyArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,52 @@ func (s *historyArchiverSuite) TestArchive_Fail_NonRetriableErrorOption() {
s.Equal(nonRetryableErr, err)
}

func (s *historyArchiverSuite) TestArchive_Skip() {
mockCtrl := gomock.NewController(s.T())
defer mockCtrl.Finish()
historyIterator := archiver.NewMockHistoryIterator(mockCtrl)
historyBlob := &archiver.HistoryBlob{
Header: &archiver.HistoryBlobHeader{
IsLast: common.BoolPtr(false),
},
Body: []*types.History{
{
Events: []*types.HistoryEvent{
{
EventID: common.FirstEventID,
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
Version: testCloseFailoverVersion,
},
},
},
},
}
gomock.InOrder(
historyIterator.EXPECT().HasNext().Return(true),
historyIterator.EXPECT().Next().Return(historyBlob, nil),
historyIterator.EXPECT().HasNext().Return(true),
historyIterator.EXPECT().Next().Return(nil, &types.EntityNotExistsError{Message: "workflow not found"}),
)

historyArchiver := s.newTestHistoryArchiver(historyIterator)
request := &archiver.ArchiveHistoryRequest{
DomainID: testDomainID,
DomainName: testDomainName,
WorkflowID: testWorkflowID,
RunID: testRunID,
BranchToken: testBranchToken,
NextEventID: testNextEventID,
CloseFailoverVersion: testCloseFailoverVersion,
}
URI, err := archiver.NewURI(testBucketURI + "/TestArchive_Skip")
s.NoError(err)
err = historyArchiver.Archive(context.Background(), URI, request)
s.NoError(err)

expectedkey := constructHistoryKey("", testDomainID, testWorkflowID, testRunID, testCloseFailoverVersion, 0)
s.assertKeyExists(expectedkey)
}

func (s *historyArchiverSuite) TestArchive_Success() {
mockCtrl := gomock.NewController(s.T())
defer mockCtrl.Finish()
Expand Down
4 changes: 2 additions & 2 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1666,6 +1666,7 @@ const (
HistoryArchiverHistoryMutatedCount
HistoryArchiverTotalUploadSize
HistoryArchiverHistorySize
HistoryArchiverDuplicateArchivalsCount

// The following metrics are only used by internal history archiver implemention.
// TODO: move them to internal repo once cadence plugin model is in place.
Expand All @@ -1675,7 +1676,6 @@ const (
HistoryArchiverDeterministicConstructionCheckFailedCount
HistoryArchiverRunningBlobIntegrityCheckCount
HistoryArchiverBlobIntegrityCheckFailedCount
HistoryArchiverDuplicateArchivalsCount

VisibilityArchiverArchiveNonRetryableErrorCount
VisibilityArchiverArchiveTransientErrorCount
Expand Down Expand Up @@ -2129,13 +2129,13 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
HistoryArchiverHistoryMutatedCount: {metricName: "history_archiver_history_mutated", metricType: Counter},
HistoryArchiverTotalUploadSize: {metricName: "history_archiver_total_upload_size", metricType: Timer},
HistoryArchiverHistorySize: {metricName: "history_archiver_history_size", metricType: Timer},
HistoryArchiverDuplicateArchivalsCount: {metricName: "history_archiver_duplicate_archivals", metricType: Counter},
HistoryArchiverBlobExistsCount: {metricName: "history_archiver_blob_exists", metricType: Counter},
HistoryArchiverBlobSize: {metricName: "history_archiver_blob_size", metricType: Timer},
HistoryArchiverRunningDeterministicConstructionCheckCount: {metricName: "history_archiver_running_deterministic_construction_check", metricType: Counter},
HistoryArchiverDeterministicConstructionCheckFailedCount: {metricName: "history_archiver_deterministic_construction_check_failed", metricType: Counter},
HistoryArchiverRunningBlobIntegrityCheckCount: {metricName: "history_archiver_running_blob_integrity_check", metricType: Counter},
HistoryArchiverBlobIntegrityCheckFailedCount: {metricName: "history_archiver_blob_integrity_check_failed", metricType: Counter},
HistoryArchiverDuplicateArchivalsCount: {metricName: "history_archiver_duplicate_archivals", metricType: Counter},
VisibilityArchiverArchiveNonRetryableErrorCount: {metricName: "visibility_archiver_archive_non_retryable_error", metricType: Counter},
VisibilityArchiverArchiveTransientErrorCount: {metricName: "visibility_archiver_archive_transient_error", metricType: Counter},
VisibilityArchiveSuccessCount: {metricName: "visibility_archiver_archive_success", metricType: Counter},
Expand Down
6 changes: 6 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ func IsServiceTransientError(err error) bool {
return false
}

// IsEntityNotExistsError checks if the error is an entity not exists error.
func IsEntityNotExistsError(err error) bool {
_, ok := err.(*types.EntityNotExistsError)
return ok
}

// IsServiceBusyError checks if the error is a service busy error.
func IsServiceBusyError(err error) bool {
switch err.(type) {
Expand Down

0 comments on commit cd40ea3

Please sign in to comment.