Skip to content

Commit

Permalink
Immediate replication task hydration after successful transaction (#4980
Browse files Browse the repository at this point in the history
)

* Immediate replication task hydration after successful transaction

* Regenerate mocks

* Fixing tests

* More test fixes

* Test fixes

* More comments and unit tests

* Minor

* Clean commit

* Pass isRunning field to immediateMutableState

* Addressing review comments
  • Loading branch information
vytautas-karpavicius authored Sep 2, 2022
1 parent ae1e0a0 commit 3362f85
Show file tree
Hide file tree
Showing 21 changed files with 546 additions and 91 deletions.
17 changes: 17 additions & 0 deletions common/persistence/workflowExecutionInfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,23 @@ func (e *WorkflowExecutionInfo) UpdateWorkflowStateCloseStatus(

}

func (e *WorkflowExecutionInfo) IsRunning() bool {
switch e.State {
case WorkflowStateCreated:
return true
case WorkflowStateRunning:
return true
case WorkflowStateCompleted:
return false
case WorkflowStateZombie:
return false
case WorkflowStateCorrupted:
return false
default:
panic(fmt.Sprintf("unknown workflow state: %v", e.State))
}
}

// UpdateWorkflowStateCloseStatus update the workflow state
func (e *WorkflowExecutionInfo) createInvalidStateTransitionErr(
currentState int,
Expand Down
4 changes: 4 additions & 0 deletions service/history/common/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ package common

import (
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/service/history/events"
)

type (
// NotifyTaskInfo defines the info of task notification
NotifyTaskInfo struct {
ExecutionInfo *persistence.WorkflowExecutionInfo
Tasks []persistence.Task
VersionHistories *persistence.VersionHistories
Activities map[int64]*persistence.ActivityInfo
History events.PersistedBlobs
PersistenceError bool
}
)
1 change: 1 addition & 0 deletions service/history/engine/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,6 @@ type (
NotifyNewTransferTasks(info *hcommon.NotifyTaskInfo)
NotifyNewTimerTasks(info *hcommon.NotifyTaskInfo)
NotifyNewCrossClusterTasks(info *hcommon.NotifyTaskInfo)
NotifyNewReplicationTasks(info *hcommon.NotifyTaskInfo)
}
)
12 changes: 12 additions & 0 deletions service/history/engine/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions service/history/events/blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// The MIT License (MIT)

// Copyright (c) 2022 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package events

import (
"bytes"

"github.com/uber/cadence/common/persistence"
)

type (
// PersistedBlob is a wrapper on persistence.DataBlob with additional field indicating what was persisted.
// Additional fields are used as an identification key among other blobs.
PersistedBlob struct {
persistence.DataBlob

BranchToken []byte
FirstEventID int64
}
// PersistedBlobs is a slice of PersistedBlob
PersistedBlobs []PersistedBlob
)

// Find searches for persisted event blob. Returns nil when not found.
func (blobs PersistedBlobs) Find(branchToken []byte, firstEventID int64) *persistence.DataBlob {
// Linear search is ok here, as we will only have 1-2 persisted blobs per transaction
for _, blob := range blobs {
if bytes.Equal(blob.BranchToken, branchToken) && blob.FirstEventID == firstEventID {
return &blob.DataBlob
}
}
return nil
}
49 changes: 49 additions & 0 deletions service/history/events/blob_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// The MIT License (MIT)

// Copyright (c) 2022 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package events

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common/persistence"
)

func TestPersistedBlobs_Find(t *testing.T) {
blob1 := persistence.DataBlob{Data: []byte{1, 2, 3}}
blob2 := persistence.DataBlob{Data: []byte{4, 5, 6}}
blob3 := persistence.DataBlob{Data: []byte{7, 8, 9}}
branchA := []byte{11, 11, 11}
branchB := []byte{22, 22, 22}
persistedBlobs := PersistedBlobs{
PersistedBlob{BranchToken: branchA, FirstEventID: 100, DataBlob: blob1},
PersistedBlob{BranchToken: branchA, FirstEventID: 105, DataBlob: blob2},
PersistedBlob{BranchToken: branchB, FirstEventID: 100, DataBlob: blob3},
}
assert.Equal(t, blob1, *persistedBlobs.Find(branchA, 100))
assert.Equal(t, blob2, *persistedBlobs.Find(branchA, 105))
assert.Equal(t, blob3, *persistedBlobs.Find(branchB, 100))
assert.Nil(t, persistedBlobs.Find(branchB, 105))
assert.Nil(t, persistedBlobs.Find([]byte{99}, 100))
}
Loading

0 comments on commit 3362f85

Please sign in to comment.