From 99dccc07951f5a593faa0304eac35dea592f208b Mon Sep 17 00:00:00 2001 From: Zijian Date: Wed, 17 Apr 2024 12:18:09 -0700 Subject: [PATCH] Add AsDuplicateRequestError function --- common/persistence/errors.go | 13 +++- common/persistence/errors_test.go | 67 +++++++++++++++++++ .../engine/engineimpl/historyEngine.go | 8 +-- 3 files changed, 83 insertions(+), 5 deletions(-) create mode 100644 common/persistence/errors_test.go diff --git a/common/persistence/errors.go b/common/persistence/errors.go index 218c9e48610..e25992e204b 100644 --- a/common/persistence/errors.go +++ b/common/persistence/errors.go @@ -22,7 +22,10 @@ package persistence -import "fmt" +import ( + "errors" + "fmt" +) type ( // TimeoutError is returned when a write operation fails due to a timeout @@ -121,3 +124,11 @@ func (e *DBUnavailableError) Error() string { func (e *TransactionSizeLimitError) Error() string { return e.Msg } + +func AsDuplicateRequestError(err error) (*DuplicateRequestError, bool) { + var e *DuplicateRequestError + if errors.As(err, &e) { + return e, true + } + return nil, false +} diff --git a/common/persistence/errors_test.go b/common/persistence/errors_test.go new file mode 100644 index 00000000000..cb61c4ef2b5 --- /dev/null +++ b/common/persistence/errors_test.go @@ -0,0 +1,67 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package persistence + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAsDuplicateRequestError(t *testing.T) { + testCases := []struct { + name string + err error + expectedErr *DuplicateRequestError + ok bool + }{ + { + name: "unwrapped", + err: &DuplicateRequestError{RunID: "a"}, + expectedErr: &DuplicateRequestError{RunID: "a"}, + ok: true, + }, + { + name: "wrapped", + err: fmt.Errorf("%w", &DuplicateRequestError{RunID: "b"}), + expectedErr: &DuplicateRequestError{RunID: "b"}, + ok: true, + }, + { + name: "not same type", + err: fmt.Errorf("adasdf"), + ok: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + e, ok := AsDuplicateRequestError(tc.err) + assert.Equal(t, tc.ok, ok) + if ok { + assert.Equal(t, tc.expectedErr, e) + } + }) + } +} diff --git a/service/history/engine/engineimpl/historyEngine.go b/service/history/engine/engineimpl/historyEngine.go index e4890607901..590f1d644bd 100644 --- a/service/history/engine/engineimpl/historyEngine.go +++ b/service/history/engine/engineimpl/historyEngine.go @@ -789,7 +789,7 @@ func (e *historyEngineImpl) startWorkflowHelper( prevLastWriteVersion, persistence.CreateWorkflowRequestModeNew, ) - if t, ok := err.(*persistence.DuplicateRequestError); ok { + if t, ok := persistence.AsDuplicateRequestError(err); ok { if t.RequestType == persistence.WorkflowRequestTypeStart || (isSignalWithStart && t.RequestType == persistence.WorkflowRequestTypeSignal) { return &types.StartWorkflowExecutionResponse{ RunID: t.RunID, @@ -865,7 +865,7 @@ func (e *historyEngineImpl) startWorkflowHelper( t.LastWriteVersion, persistence.CreateWorkflowRequestModeNew, ) - if t, ok := err.(*persistence.DuplicateRequestError); ok { + if t, ok := persistence.AsDuplicateRequestError(err); ok { if t.RequestType == persistence.WorkflowRequestTypeStart || (isSignalWithStart && t.RequestType == persistence.WorkflowRequestTypeSignal) { return &types.StartWorkflowExecutionResponse{ RunID: t.RunID, @@ -2640,7 +2640,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution( // We apply the update to execution using optimistic concurrency. If it fails due to a conflict then reload // the history and try the operation again. if err := wfContext.UpdateWorkflowExecutionAsActive(ctx, e.shard.GetTimeSource().Now()); err != nil { - if t, ok := err.(*persistence.DuplicateRequestError); ok { + if t, ok := persistence.AsDuplicateRequestError(err); ok { if t.RequestType == persistence.WorkflowRequestTypeSignal { return &types.StartWorkflowExecutionResponse{RunID: t.RunID}, nil } @@ -3041,7 +3041,7 @@ func (e *historyEngineImpl) ResetWorkflowExecution( nil, request.GetSkipSignalReapply(), ); err != nil { - if t, ok := err.(*persistence.DuplicateRequestError); ok { + if t, ok := persistence.AsDuplicateRequestError(err); ok { if t.RequestType == persistence.WorkflowRequestTypeReset { return &types.ResetWorkflowExecutionResponse{ RunID: t.RunID,