Skip to content

Commit

Permalink
Improve archival history mutated error logs and add option to allow a…
Browse files Browse the repository at this point in the history
…rchiving incomplete history
  • Loading branch information
longquanzheng committed Oct 14, 2021
1 parent 88a53b4 commit ff344e6
Show file tree
Hide file tree
Showing 18 changed files with 287 additions and 193 deletions.
7 changes: 4 additions & 3 deletions common/archiver/filestore/historyArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,10 @@ func (h *historyArchiver) Archive(
return err
}

if historyMutated(request, historyBlob.Body, *historyBlob.Header.IsLast) {
logger.Error(archiver.ArchiveNonRetriableErrorMsg, tag.ArchivalArchiveFailReason(archiver.ErrReasonHistoryMutated))
return archiver.ErrHistoryMutated
if archiver.IsHistoryMutated(request, historyBlob.Body, *historyBlob.Header.IsLast, logger) {
if !featureCatalog.ArchiveIncompleteHistory() {
return archiver.ErrHistoryMutated
}
}

historyBatches = append(historyBatches, historyBlob.Body...)
Expand Down
16 changes: 0 additions & 16 deletions common/archiver/filestore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

"github.com/dgryski/go-farm"

"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/util"
)
Expand Down Expand Up @@ -133,21 +132,6 @@ func extractCloseFailoverVersion(filename string) (int64, error) {
return strconv.ParseInt(filenameParts[1], 10, 64)
}

func historyMutated(request *archiver.ArchiveHistoryRequest, historyBatches []*types.History, isLast bool) bool {
lastBatch := historyBatches[len(historyBatches)-1].Events
lastEvent := lastBatch[len(lastBatch)-1]
lastFailoverVersion := lastEvent.GetVersion()
if lastFailoverVersion > request.CloseFailoverVersion {
return true
}

if !isLast {
return false
}
lastEventID := lastEvent.GetEventID()
return lastFailoverVersion != request.CloseFailoverVersion || lastEventID+1 != request.NextEventID
}

func contextExpired(ctx context.Context) bool {
select {
case <-ctx.Done():
Expand Down
101 changes: 0 additions & 101 deletions common/archiver/filestore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/stretchr/testify/suite"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/util"
)
Expand Down Expand Up @@ -195,106 +194,6 @@ func (s *UtilSuite) TestExtractCloseFailoverVersion() {
}
}

func (s *UtilSuite) TestHistoryMutated() {
testCases := []struct {
historyBatches []*types.History
request *archiver.ArchiveHistoryRequest
isLast bool
isMutated bool
}{
{
historyBatches: []*types.History{
{
Events: []*types.HistoryEvent{
{
Version: 15,
},
},
},
},
request: &archiver.ArchiveHistoryRequest{
CloseFailoverVersion: 3,
},
isMutated: true,
},
{
historyBatches: []*types.History{
{
Events: []*types.HistoryEvent{
{
EventID: 33,
Version: 10,
},
},
},
{
Events: []*types.HistoryEvent{
{
EventID: 49,
Version: 10,
},
{
EventID: 50,
Version: 10,
},
},
},
},
request: &archiver.ArchiveHistoryRequest{
CloseFailoverVersion: 10,
NextEventID: 34,
},
isLast: true,
isMutated: true,
},
{
historyBatches: []*types.History{
{
Events: []*types.HistoryEvent{
{
Version: 9,
},
},
},
},
request: &archiver.ArchiveHistoryRequest{
CloseFailoverVersion: 10,
},
isLast: true,
isMutated: true,
},
{
historyBatches: []*types.History{
{
Events: []*types.HistoryEvent{
{
EventID: 20,
Version: 10,
},
},
},
{
Events: []*types.HistoryEvent{
{
EventID: 33,
Version: 10,
},
},
},
},
request: &archiver.ArchiveHistoryRequest{
CloseFailoverVersion: 10,
NextEventID: 34,
},
isLast: true,
isMutated: false,
},
}
for _, tc := range testCases {
s.Equal(tc.isMutated, historyMutated(tc.request, tc.historyBatches, tc.isLast))
}
}

func (s *UtilSuite) TestSerializeDeserializeGetHistoryToken() {
token := &getHistoryToken{
CloseFailoverVersion: 101,
Expand Down
29 changes: 10 additions & 19 deletions common/archiver/gcloud/historyArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ func NewHistoryArchiver(
return nil, err
}

func newHistoryArchiver(container *archiver.HistoryBootstrapContainer, historyIterator archiver.HistoryIterator, storage connector.Client) archiver.HistoryArchiver {
func newHistoryArchiver(
container *archiver.HistoryBootstrapContainer,
historyIterator archiver.HistoryIterator,
storage connector.Client,
) archiver.HistoryArchiver {
return &historyArchiver{
container: container,
gcloudStorage: storage,
Expand Down Expand Up @@ -161,9 +165,11 @@ func (h *historyArchiver) Archive(ctx context.Context, URI archiver.URI, request
return err
}

if historyMutated(request, historyBlob.Body, *historyBlob.Header.IsLast) {
logger.Error(archiver.ArchiveNonRetriableErrorMsg, tag.ArchivalArchiveFailReason(archiver.ErrReasonHistoryMutated))
return archiver.ErrHistoryMutated
if archiver.IsHistoryMutated(request, historyBlob.Body, *historyBlob.Header.IsLast, logger) {
if !featureCatalog.ArchiveIncompleteHistory() {
return archiver.ErrHistoryMutated
}

}

encodedHistoryPart, err := encode(historyBlob.Body)
Expand Down Expand Up @@ -322,21 +328,6 @@ func getNextHistoryBlob(ctx context.Context, historyIterator archiver.HistoryIte
return historyBlob, nil
}

func historyMutated(request *archiver.ArchiveHistoryRequest, historyBatches []*types.History, isLast bool) bool {
lastBatch := historyBatches[len(historyBatches)-1].Events
lastEvent := lastBatch[len(lastBatch)-1]
lastFailoverVersion := lastEvent.GetVersion()
if lastFailoverVersion > request.CloseFailoverVersion {
return true
}

if !isLast {
return false
}
lastEventID := lastEvent.GetEventID()
return lastFailoverVersion != request.CloseFailoverVersion || lastEventID+1 != request.NextEventID
}

func (h *historyArchiver) getHighestVersion(ctx context.Context, URI archiver.URI, request *archiver.GetHistoryRequest) (*int64, *int, *int, error) {

filenames, err := h.gcloudStorage.Query(ctx, URI, constructHistoryFilenamePrefix(request.DomainID, request.WorkflowID, request.RunID))
Expand Down
18 changes: 15 additions & 3 deletions common/archiver/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"errors"

"go.uber.org/cadence/activity"

"github.com/uber/cadence/common/dynamicconfig"
)

type (
Expand All @@ -35,8 +37,9 @@ type (
// ArchiveFeatureCatalog is a collection features for the Archive method of
// History/Visibility Archiver
ArchiveFeatureCatalog struct {
ProgressManager ProgressManager
NonRetriableError NonRetriableError
ProgressManager ProgressManager
NonRetriableError NonRetriableError
ArchiveIncompleteHistory dynamicconfig.BoolPropertyFn
}

// NonRetriableError returns an error indicating archiver has encountered an non-retriable error
Expand All @@ -53,7 +56,9 @@ type (
// GetFeatureCatalog applies all the ArchiveOptions to the catalog and returns the catalog.
// It should be called inside the Archive method.
func GetFeatureCatalog(opts ...ArchiveOption) *ArchiveFeatureCatalog {
catalog := &ArchiveFeatureCatalog{}
catalog := &ArchiveFeatureCatalog{
ArchiveIncompleteHistory: dynamicconfig.GetBoolPropertyFn(false),
}
for _, opt := range opts {
opt(catalog)
}
Expand Down Expand Up @@ -95,3 +100,10 @@ func GetNonRetriableErrorOption(nonRetryableErr error) ArchiveOption {
}
}
}

// GetArchivingIncompleteHistoryOption returns an ArchiveOption so that archiver would archive incomplete history
func GetArchivingIncompleteHistoryOption(allow dynamicconfig.BoolPropertyFn) ArchiveOption {
return func(catalog *ArchiveFeatureCatalog) {
catalog.ArchiveIncompleteHistory = allow
}
}
7 changes: 4 additions & 3 deletions common/archiver/s3store/historyArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,10 @@ func (h *historyArchiver) Archive(
return err
}

if historyMutated(request, historyBlob.Body, *historyBlob.Header.IsLast) {
logger.Error(archiver.ArchiveNonRetriableErrorMsg, tag.ArchivalArchiveFailReason(archiver.ErrReasonHistoryMutated))
return archiver.ErrHistoryMutated
if archiver.IsHistoryMutated(request, historyBlob.Body, *historyBlob.Header.IsLast, logger) {
if !featureCatalog.ArchiveIncompleteHistory() {
return archiver.ErrHistoryMutated
}
}

encodedHistoryBlob, err := encode(historyBlob)
Expand Down
15 changes: 0 additions & 15 deletions common/archiver/s3store/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,21 +236,6 @@ func download(ctx context.Context, s3cli s3iface.S3API, URI archiver.URI, key st
return body, nil
}

func historyMutated(request *archiver.ArchiveHistoryRequest, historyBatches []*types.History, isLast bool) bool {
lastBatch := historyBatches[len(historyBatches)-1].Events
lastEvent := lastBatch[len(lastBatch)-1]
lastFailoverVersion := lastEvent.GetVersion()
if lastFailoverVersion > request.CloseFailoverVersion {
return true
}

if !isLast {
return false
}
lastEventID := lastEvent.GetEventID()
return lastFailoverVersion != request.CloseFailoverVersion || lastEventID+1 != request.NextEventID
}

func contextExpired(ctx context.Context) bool {
select {
case <-ctx.Done():
Expand Down
24 changes: 24 additions & 0 deletions common/archiver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/types"
)

var (
Expand Down Expand Up @@ -150,3 +151,26 @@ func ConvertSearchAttrToBytes(searchAttrStr map[string]string) map[string][]byte
}
return searchAttr
}

func IsHistoryMutated(request *ArchiveHistoryRequest, historyBatches []*types.History, isLast bool, logger log.Logger) (mutated bool) {
lastBatch := historyBatches[len(historyBatches)-1].Events
lastEvent := lastBatch[len(lastBatch)-1]
lastFailoverVersion := lastEvent.GetVersion()
defer func() {
if mutated {
logger.Warn(ArchiveNonRetriableErrorMsg+":history is mutated when during archival",
tag.ArchivalArchiveFailReason(ErrReasonHistoryMutated),
tag.FailoverVersion(lastFailoverVersion),
tag.TokenLastEventID(lastEvent.GetEventID()))
}
}()
if lastFailoverVersion > request.CloseFailoverVersion {
return true
}

if !isLast {
return false
}
lastEventID := lastEvent.GetEventID()
return lastFailoverVersion != request.CloseFailoverVersion || lastEventID+1 != request.NextEventID
}
Loading

0 comments on commit ff344e6

Please sign in to comment.