Skip to content

Commit

Permalink
Store mutable state checksum in SQL storage (#5649)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Feb 12, 2024
1 parent aa294ed commit d3bff7d
Show file tree
Hide file tree
Showing 20 changed files with 279 additions and 25 deletions.
132 changes: 128 additions & 4 deletions .gen/go/sqlblobs/sqlblobs.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/startreedata/pinot-client-go v0.0.0-20230303070132-3b84c28a9e95 // latest release doesn't support pinot v0.12, so use master branch
github.com/stretchr/testify v1.8.3
github.com/uber-go/tally v3.3.15+incompatible // indirect
github.com/uber/cadence-idl v0.0.0-20240206193658-7cafd96fa80e
github.com/uber/cadence-idl v0.0.0-20240208213432-2e3c661bfb7c
github.com/uber/ringpop-go v0.8.5 // indirect
github.com/uber/tchannel-go v1.22.2 // indirect
github.com/urfave/cli v1.22.4
Expand Down
4 changes: 2 additions & 2 deletions cmd/server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyu
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20240206193658-7cafd96fa80e h1:/Ie1HeCKuIrCMiMqf9I1TxETqUt2x6jeo5aw0+j2yDw=
github.com/uber/cadence-idl v0.0.0-20240206193658-7cafd96fa80e/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20240208213432-2e3c661bfb7c h1:PSZBXcnbY0+cDeAZ9GocEEJgX+ZBvLYDL7qdes9/bfg=
github.com/uber/cadence-idl v0.0.0-20240208213432-2e3c661bfb7c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
11 changes: 8 additions & 3 deletions common/persistence/dataStoreInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,10 @@ type (
SignalRequestedIDs map[string]struct{}
BufferedEvents []*DataBlob

Checksum checksum.Checksum
// Checksum field is used by Cassandra storage
// ChecksumData is used by All SQL storage
Checksum checksum.Checksum
ChecksumData *DataBlob
}

// InternalActivityInfo details for Persistence Interface
Expand Down Expand Up @@ -465,7 +468,8 @@ type (

Condition int64

Checksum checksum.Checksum
Checksum checksum.Checksum
ChecksumData *DataBlob
}

// InternalWorkflowSnapshot is used as generic workflow execution state snapshot for Persistence Interface
Expand All @@ -489,7 +493,8 @@ type (

Condition int64

Checksum checksum.Checksum
Checksum checksum.Checksum
ChecksumData *DataBlob
}

// InternalAppendHistoryEventsRequest is used to append new events to workflow execution history for Persistence Interface
Expand Down
26 changes: 22 additions & 4 deletions common/persistence/executionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ func (m *executionManagerImpl) GetWorkflowExecution(
newResponse.State.VersionHistories = versionHistories
newResponse.MutableStateStats = m.statsComputer.computeMutableStateStats(response)

if len(newResponse.State.Checksum.Value) == 0 {
newResponse.State.Checksum, err = m.serializer.DeserializeChecksum(response.State.ChecksumData)
if err != nil {
return nil, err
}
}

return newResponse, nil
}

Expand Down Expand Up @@ -635,6 +642,10 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
if err != nil {
return nil, err
}
checksumData, err := m.serializer.SerializeChecksum(input.Checksum, common.EncodingTypeJSON)
if err != nil {
return nil, err
}

return &InternalWorkflowMutation{
ExecutionInfo: serializedExecutionInfo,
Expand Down Expand Up @@ -662,8 +673,9 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(
ReplicationTasks: input.ReplicationTasks,
TimerTasks: input.TimerTasks,

Condition: input.Condition,
Checksum: input.Checksum,
Condition: input.Condition,
Checksum: input.Checksum,
ChecksumData: checksumData,
}, nil
}

Expand Down Expand Up @@ -702,6 +714,11 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(
return nil, err
}

checksumData, err := m.serializer.SerializeChecksum(input.Checksum, common.EncodingTypeJSON)
if err != nil {
return nil, err
}

return &InternalWorkflowSnapshot{
ExecutionInfo: serializedExecutionInfo,
VersionHistories: serializedVersionHistories,
Expand All @@ -720,8 +737,9 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(
ReplicationTasks: input.ReplicationTasks,
TimerTasks: input.TimerTasks,

Condition: input.Condition,
Checksum: input.Checksum,
Condition: input.Condition,
Checksum: input.Checksum,
ChecksumData: checksumData,
}, nil
}

Expand Down
5 changes: 0 additions & 5 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ func (s *ExecutionManagerSuite) newRandomChecksum() checksum.Checksum {
}

func (s *ExecutionManagerSuite) assertChecksumsEqual(expected checksum.Checksum, actual checksum.Checksum) {
if !actual.Flavor.IsValid() {
// not all stores support checksum persistence today
// if its not supported, assert that everything is zero'd out
expected = checksum.Checksum{}
}
s.EqualValues(expected, actual)
}

Expand Down
2 changes: 2 additions & 0 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,7 @@ func (s *TestBase) UpdateWorkflowExecutionForBufferEvents(
Condition: condition,
ClearBufferedEvents: clearBufferedEvents,
VersionHistories: versionHistories,
Checksum: testWorkflowChecksum,
},
RangeID: s.ShardInfo.RangeID,
Encoding: pickRandomEncoding(),
Expand Down Expand Up @@ -1307,6 +1308,7 @@ func (s *TestBase) UpdateAllMutableState(ctx context.Context, updatedMutableStat
UpsertSignalInfos: sInfos,
UpsertSignalRequestedIDs: srIDs,
VersionHistories: updatedMutableState.VersionHistories,
Checksum: updatedMutableState.Checksum,
},
Encoding: pickRandomEncoding(),
})
Expand Down
16 changes: 16 additions & 0 deletions common/persistence/serialization/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,22 @@ func (w *WorkflowExecutionInfo) GetPartitionConfig() (o map[string]string) {
return
}

// GetCheckSum internal sql blob getter
func (w *WorkflowExecutionInfo) GetChecksum() (o []byte) {
if w != nil {
return w.Checksum
}
return
}

// GetCheckSumEncoding internal sql blob getter
func (w *WorkflowExecutionInfo) GetChecksumEncoding() (o string) {
if w != nil {
return w.ChecksumEncoding
}
return
}

// GetVersion internal sql blob getter
func (a *ActivityInfo) GetVersion() (o int64) {
if a != nil {
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/serialization/getters_fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ var expectedNil = map[string]map[string]any{
"GetVersionHistoriesEncoding": "",
"GetWorkflowTimeout": time.Duration(0),
"GetWorkflowTypeName": "",
"GetChecksum": []uint8(nil),
"GetChecksumEncoding": "",
},
"*serialization.TimerTaskInfo": {
"GetDomainID": []uint8(nil),
Expand Down Expand Up @@ -320,6 +322,8 @@ var expectedEmpty = map[string]map[string]any{
"GetVersionHistoriesEncoding": "",
"GetWorkflowTimeout": time.Duration(0),
"GetWorkflowTypeName": "",
"GetChecksum": []uint8(nil),
"GetChecksumEncoding": "",
},
"*serialization.TimerTaskInfo": {
"GetDomainID": []uint8(nil),
Expand Down Expand Up @@ -547,6 +551,8 @@ var expectedNonEmpty = map[string]map[string]any{
"GetVersionHistoriesEncoding": "",
"GetWorkflowTimeout": time.Duration(3),
"GetWorkflowTypeName": "workflowTypeName",
"GetChecksum": []uint8(nil),
"GetChecksumEncoding": "",
},
"*serialization.TimerTaskInfo": {
"GetDomainID": []byte(taskDomainID),
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/serialization/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ type (
VersionHistoriesEncoding string
FirstExecutionRunID UUID
PartitionConfig map[string]string
Checksum []byte
ChecksumEncoding string
}

// ActivityInfo blob in a serialization agnostic format
Expand Down
4 changes: 4 additions & 0 deletions common/persistence/serialization/thrift_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ func workflowExecutionInfoToThrift(info *WorkflowExecutionInfo) *sqlblobs.Workfl
VersionHistoriesEncoding: &info.VersionHistoriesEncoding,
FirstExecutionRunID: info.FirstExecutionRunID,
PartitionConfig: info.PartitionConfig,
Checksum: info.Checksum,
ChecksumEncoding: &info.ChecksumEncoding,
}
}

Expand Down Expand Up @@ -325,6 +327,8 @@ func workflowExecutionInfoFromThrift(info *sqlblobs.WorkflowExecutionInfo) *Work
FirstExecutionRunID: info.FirstExecutionRunID,
PartitionConfig: info.PartitionConfig,
IsCron: info.GetCronSchedule() != "",
Checksum: info.Checksum,
ChecksumEncoding: info.GetChecksumEncoding(),
}
}

Expand Down
4 changes: 4 additions & 0 deletions common/persistence/serialization/thrift_mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ func TestWorkflowExecutionInfo(t *testing.T) {
VersionHistoriesEncoding: "VersionHistoriesEncoding",
FirstExecutionRunID: UUID(uuid.New()),
PartitionConfig: map[string]string{"zone": "dca1"},
Checksum: []byte("Checksum"),
ChecksumEncoding: "ChecksumEncoding",
}
actual := workflowExecutionInfoFromThrift(workflowExecutionInfoToThrift(expected))
assert.Equal(t, expected.ParentDomainID, actual.ParentDomainID)
Expand Down Expand Up @@ -293,6 +295,8 @@ func TestWorkflowExecutionInfo(t *testing.T) {
assert.True(t, (expected.RetryExpiration-actual.RetryExpiration) < time.Second)
assert.Equal(t, expected.FirstExecutionRunID, actual.FirstExecutionRunID)
assert.Equal(t, expected.PartitionConfig, actual.PartitionConfig)
assert.Equal(t, expected.Checksum, actual.Checksum)
assert.Equal(t, expected.ChecksumEncoding, actual.ChecksumEncoding)
}

func TestActivityInfo(t *testing.T) {
Expand Down
26 changes: 26 additions & 0 deletions common/persistence/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/uber/cadence/.gen/go/replicator"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/checksum"
"github.com/uber/cadence/common/codec"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types/mapper/thrift"
Expand Down Expand Up @@ -81,6 +82,10 @@ type (
// serialize/deserialize async workflow configuration
SerializeAsyncWorkflowsConfig(config *types.AsyncWorkflowConfiguration, encodingType common.EncodingType) (*DataBlob, error)
DeserializeAsyncWorkflowsConfig(data *DataBlob) (*types.AsyncWorkflowConfiguration, error)

// serialize/deserialize checksum
SerializeChecksum(sum checksum.Checksum, encodingType common.EncodingType) (*DataBlob, error)
DeserializeChecksum(data *DataBlob) (checksum.Checksum, error)
}

// CadenceSerializationError is an error type for cadence serialization
Expand Down Expand Up @@ -298,6 +303,27 @@ func (t *serializerImpl) DeserializeAsyncWorkflowsConfig(data *DataBlob) (*types
return &cfg, err
}

func (t *serializerImpl) SerializeChecksum(sum checksum.Checksum, encodingType common.EncodingType) (*DataBlob, error) {
return t.serialize(sum, encodingType)
}

func (t *serializerImpl) DeserializeChecksum(data *DataBlob) (checksum.Checksum, error) {
if data == nil {
return checksum.Checksum{}, nil
}

var sum checksum.Checksum
if len(data.Data) == 0 {
return sum, nil
}

err := t.deserialize(data, &sum)
if err != nil {
return checksum.Checksum{}, err
}
return sum, err
}

func (t *serializerImpl) serialize(input interface{}, encodingType common.EncodingType) (*DataBlob, error) {
if input == nil {
return nil, nil
Expand Down
26 changes: 26 additions & 0 deletions common/persistence/serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/checksum"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/types"
)
Expand Down Expand Up @@ -210,13 +211,30 @@ func TestSerializers(t *testing.T) {
return serializer.DeserializeAsyncWorkflowsConfig(data)
},
},
{
name: "checksum",
payloads: map[string]any{
"normal": generateChecksum(),
},
serializeFn: func(payload any, encoding common.EncodingType) (*DataBlob, error) {
return serializer.SerializeChecksum(payload.(checksum.Checksum), encoding)
},
deserializeFn: func(data *DataBlob) (any, error) {
return serializer.DeserializeChecksum(data)
},
},
}

// generate runnable test cases here so actual test body is not 3 level nested
var runnableTests []runnableTest
for _, td := range tests {
for encoding, supported := range encodingTypes {
for payloadName, payload := range td.payloads {
if _, ok := payload.(checksum.Checksum); ok {
if encoding != common.EncodingTypeJSON {
continue
}
}
runnableTests = append(runnableTests, runnableTest{
testDef: td,
encoding: encoding,
Expand Down Expand Up @@ -468,3 +486,11 @@ func generateAsyncWorkflowConfig() *types.AsyncWorkflowConfiguration {
},
}
}

func generateChecksum() checksum.Checksum {
return checksum.Checksum{
Flavor: checksum.FlavorIEEECRC32OverThriftBinary,
Version: 1,
Value: []byte("test-checksum"),
}
}
7 changes: 7 additions & 0 deletions common/persistence/sql/sql_execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,13 @@ func (m *sqlExecutionStore) populateWorkflowMutableState(
)
}

if info.GetChecksum() != nil {
state.ChecksumData = p.NewDataBlob(
info.GetChecksum(),
common.EncodingType(info.GetChecksumEncoding()),
)
}

return state, nil
}

Expand Down
Loading

0 comments on commit d3bff7d

Please sign in to comment.