From 194af223eef147dba33ba6bf6d75d63fbadf249f Mon Sep 17 00:00:00 2001 From: taylan isikdemir Date: Thu, 22 Feb 2024 15:16:50 -0800 Subject: [PATCH] rebase master --- .../nosql/nosqlplugin/cassandra/workflow.go | 62 +---- .../nosqlplugin/cassandra/workflow_test.go | 220 ++++++++++++++++++ .../nosqlplugin/cassandra/workflow_utils.go | 12 +- .../cassandra/workflow_utils_test.go | 34 ++- 4 files changed, 250 insertions(+), 78 deletions(-) create mode 100644 common/persistence/nosql/nosqlplugin/cassandra/workflow_test.go diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow.go index 52027c9a608..be71653a4e0 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/workflow.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow.go @@ -62,27 +62,11 @@ func (db *cdb) InsertWorkflowExecutionWithTasks( return err } - err = createTransferTasks(batch, shardID, domainID, workflowID, transferTasks) - if err != nil { - return err - } - err = createReplicationTasks(batch, shardID, domainID, workflowID, replicationTasks) - if err != nil { - return err - } - err = createCrossClusterTasks(batch, shardID, domainID, workflowID, crossClusterTasks) - if err != nil { - return err - } - err = createTimerTasks(batch, shardID, domainID, workflowID, timerTasks) - if err != nil { - return err - } - - err = assertShardRangeID(batch, shardID, shardCondition.RangeID) - if err != nil { - return err - } + createTransferTasks(batch, shardID, domainID, workflowID, transferTasks) + createReplicationTasks(batch, shardID, domainID, workflowID, replicationTasks) + createCrossClusterTasks(batch, shardID, domainID, workflowID, crossClusterTasks) + createTimerTasks(batch, shardID, domainID, workflowID, timerTasks) + assertShardRangeID(batch, shardID, shardCondition.RangeID) return executeCreateWorkflowBatchTransaction(db.session, batch, currentWorkflowRequest, execution, shardCondition) } @@ -179,27 +163,11 @@ func (db *cdb) UpdateWorkflowExecutionWithTasks( } } - err = createTransferTasks(batch, shardID, domainID, workflowID, transferTasks) - if err != nil { - return err - } - err = createReplicationTasks(batch, shardID, domainID, workflowID, replicationTasks) - if err != nil { - return err - } - err = createCrossClusterTasks(batch, shardID, domainID, workflowID, crossClusterTasks) - if err != nil { - return err - } - err = createTimerTasks(batch, shardID, domainID, workflowID, timerTasks) - if err != nil { - return err - } - - err = assertShardRangeID(batch, shardID, shardCondition.RangeID) - if err != nil { - return err - } + createTransferTasks(batch, shardID, domainID, workflowID, transferTasks) + createReplicationTasks(batch, shardID, domainID, workflowID, replicationTasks) + createCrossClusterTasks(batch, shardID, domainID, workflowID, crossClusterTasks) + createTimerTasks(batch, shardID, domainID, workflowID, timerTasks) + assertShardRangeID(batch, shardID, shardCondition.RangeID) return executeUpdateWorkflowBatchTransaction(db.session, batch, currentWorkflowRequest, previousNextEventIDCondition, shardCondition) } @@ -751,16 +719,10 @@ func (db *cdb) InsertReplicationTask(ctx context.Context, tasks []*nosqlplugin.R shardID := shardCondition.ShardID batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx) for _, task := range tasks { - err := createReplicationTasks(batch, shardID, task.DomainID, task.WorkflowID, []*nosqlplugin.ReplicationTask{task}) - if err != nil { - return err - } + createReplicationTasks(batch, shardID, task.DomainID, task.WorkflowID, []*nosqlplugin.ReplicationTask{task}) } - err := assertShardRangeID(batch, shardID, shardCondition.RangeID) - if err != nil { - return err - } + assertShardRangeID(batch, shardID, shardCondition.RangeID) previous := make(map[string]interface{}) applied, iter, err := db.session.MapExecuteBatchCAS(batch, previous) diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow_test.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow_test.go new file mode 100644 index 00000000000..d495d7ccbcc --- /dev/null +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow_test.go @@ -0,0 +1,220 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cassandra + +import ( + "context" + "errors" + "testing" + + "github.com/golang/mock/gomock" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/checksum" + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/persistence/nosql/nosqlplugin" + "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql" +) + +func TestInsertWorkflowExecutionWithTasks(t *testing.T) { + tests := []struct { + name string + request *nosqlplugin.CurrentWorkflowWriteRequest + execution *nosqlplugin.WorkflowExecutionRequest + transferTasks []*nosqlplugin.TransferTask + crossClusterTasks []*nosqlplugin.CrossClusterTask + replicationTasks []*nosqlplugin.ReplicationTask + timerTasks []*nosqlplugin.TimerTask + shardCondition *nosqlplugin.ShardCondition + mapExecuteBatchCASErr error + wantErr bool + }{ + { + name: "success", + request: &nosqlplugin.CurrentWorkflowWriteRequest{ + WriteMode: nosqlplugin.CurrentWorkflowWriteModeNoop, + }, + shardCondition: &nosqlplugin.ShardCondition{ + ShardID: 1, + }, + execution: &nosqlplugin.WorkflowExecutionRequest{ + InternalWorkflowExecutionInfo: persistence.InternalWorkflowExecutionInfo{ + DomainID: "test-domain-id", + WorkflowID: "test-workflow-id", + CompletionEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("test-completion-event"), + }, + AutoResetPoints: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("test-auto-reset-points"), + }, + }, + VersionHistories: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("test-version-histories"), + }, + Checksums: &checksum.Checksum{ + Version: 1, + Flavor: checksum.FlavorIEEECRC32OverThriftBinary, + Value: []byte("test-checksum"), + }, + }, + }, + { + name: "createOrUpdateCurrentWorkflow step fails", + request: &nosqlplugin.CurrentWorkflowWriteRequest{ + WriteMode: nosqlplugin.CurrentWorkflowWriteMode(-999), // unknown mode will cause failure + }, + shardCondition: &nosqlplugin.ShardCondition{ + ShardID: 1, + }, + execution: &nosqlplugin.WorkflowExecutionRequest{ + InternalWorkflowExecutionInfo: persistence.InternalWorkflowExecutionInfo{ + DomainID: "test-domain-id", + WorkflowID: "test-workflow-id", + CompletionEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("test-completion-event"), + }, + AutoResetPoints: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("test-auto-reset-points"), + }, + }, + VersionHistories: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("test-version-histories"), + }, + Checksums: &checksum.Checksum{ + Version: 1, + Flavor: checksum.FlavorIEEECRC32OverThriftBinary, + Value: []byte("test-checksum"), + }, + }, + wantErr: true, + }, + { + name: "createWorkflowExecutionWithMergeMaps step fails", + request: &nosqlplugin.CurrentWorkflowWriteRequest{ + WriteMode: nosqlplugin.CurrentWorkflowWriteModeNoop, + }, + shardCondition: &nosqlplugin.ShardCondition{ + ShardID: 1, + }, + execution: &nosqlplugin.WorkflowExecutionRequest{ + EventBufferWriteMode: nosqlplugin.EventBufferWriteModeAppend, // this will cause failure + InternalWorkflowExecutionInfo: persistence.InternalWorkflowExecutionInfo{ + DomainID: "test-domain-id", + WorkflowID: "test-workflow-id", + CompletionEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("test-completion-event"), + }, + AutoResetPoints: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("test-auto-reset-points"), + }, + }, + VersionHistories: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("test-version-histories"), + }, + Checksums: &checksum.Checksum{ + Version: 1, + Flavor: checksum.FlavorIEEECRC32OverThriftBinary, + Value: []byte("test-checksum"), + }, + }, + wantErr: true, + }, + { + name: "executeCreateWorkflowBatchTransaction step fails", + mapExecuteBatchCASErr: errors.New("some random error"), // this will cause failure + request: &nosqlplugin.CurrentWorkflowWriteRequest{ + WriteMode: nosqlplugin.CurrentWorkflowWriteModeNoop, + }, + shardCondition: &nosqlplugin.ShardCondition{ + ShardID: 1, + }, + execution: &nosqlplugin.WorkflowExecutionRequest{ + EventBufferWriteMode: nosqlplugin.EventBufferWriteModeNone, + InternalWorkflowExecutionInfo: persistence.InternalWorkflowExecutionInfo{ + DomainID: "test-domain-id", + WorkflowID: "test-workflow-id", + CompletionEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("test-completion-event"), + }, + AutoResetPoints: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("test-auto-reset-points"), + }, + }, + VersionHistories: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("test-version-histories"), + }, + Checksums: &checksum.Checksum{ + Version: 1, + Flavor: checksum.FlavorIEEECRC32OverThriftBinary, + Value: []byte("test-checksum"), + }, + }, + wantErr: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + session := &fakeSession{ + iter: &fakeIter{}, + mapExecuteBatchCASApplied: true, + mapExecuteBatchCASErr: tc.mapExecuteBatchCASErr, + } + client := gocql.NewMockClient(ctrl) + cfg := &config.NoSQL{} + logger := testlogger.New(t) + dc := &persistence.DynamicConfiguration{} + + db := newCassandraDBFromSession(cfg, session, logger, dc, dbWithClient(client)) + + err := db.InsertWorkflowExecutionWithTasks( + context.Background(), + tc.request, + tc.execution, + tc.transferTasks, + tc.crossClusterTasks, + tc.replicationTasks, + tc.timerTasks, + tc.shardCondition, + ) + + if (err != nil) != tc.wantErr { + t.Errorf("InsertWorkflowExecutionWithTasks() error = %v, wantErr %v", err, tc.wantErr) + } + }) + } +} diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go index 97e7eb1c0c8..8ad829614cc 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go @@ -292,7 +292,7 @@ func newUnknownConditionFailureReason( } } -func assertShardRangeID(batch gocql.Batch, shardID int, rangeID int64) error { +func assertShardRangeID(batch gocql.Batch, shardID int, rangeID int64) { batch.Query(templateUpdateLeaseQuery, rangeID, shardID, @@ -304,7 +304,6 @@ func assertShardRangeID(batch gocql.Batch, shardID int, rangeID int64) error { rowTypeShardTaskID, rangeID, ) - return nil } func createTimerTasks( @@ -346,7 +345,7 @@ func createReplicationTasks( domainID string, workflowID string, replicationTasks []*nosqlplugin.ReplicationTask, -) error { +) { for _, task := range replicationTasks { batch.Query(templateCreateReplicationTaskQuery, shardID, @@ -372,7 +371,6 @@ func createReplicationTasks( defaultVisibilityTimestamp, task.TaskID) } - return nil } func createTransferTasks( @@ -381,7 +379,7 @@ func createTransferTasks( domainID string, workflowID string, transferTasks []*nosqlplugin.TransferTask, -) error { +) { for _, task := range transferTasks { batch.Query(templateCreateTransferTaskQuery, shardID, @@ -408,7 +406,6 @@ func createTransferTasks( defaultVisibilityTimestamp, task.TaskID) } - return nil } func createCrossClusterTasks( @@ -417,7 +414,7 @@ func createCrossClusterTasks( domainID string, workflowID string, xClusterTasks []*nosqlplugin.CrossClusterTask, -) error { +) { for _, task := range xClusterTasks { batch.Query(templateCreateCrossClusterTaskQuery, shardID, @@ -445,7 +442,6 @@ func createCrossClusterTasks( task.TaskID, ) } - return nil } func resetSignalsRequested( diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils_test.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils_test.go index f82c68454d5..0689ada69bc 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils_test.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils_test.go @@ -48,6 +48,9 @@ type fakeSession struct { mapExecuteBatchCASApplied bool mapExecuteBatchCASPrev map[string]any mapExecuteBatchCASErr error + + // outputs + batches []*fakeBatch } func (s *fakeSession) Query(string, ...interface{}) gocql.Query { @@ -55,7 +58,9 @@ func (s *fakeSession) Query(string, ...interface{}) gocql.Query { } func (s *fakeSession) NewBatch(gocql.BatchType) gocql.Batch { - return nil + b := &fakeBatch{} + s.batches = append(s.batches, b) + return b } func (s *fakeSession) ExecuteBatch(gocql.Batch) error { @@ -98,17 +103,17 @@ func (b *fakeBatch) Query(queryTmpl string, args ...interface{}) { // WithContext is fake implementation of gocql.Batch.WithContext func (b *fakeBatch) WithContext(context.Context) gocql.Batch { - return nil + return b } // WithTimestamp is fake implementation of gocql.Batch.WithTimestamp func (b *fakeBatch) WithTimestamp(int64) gocql.Batch { - return nil + return b } // Consistency is fake implementation of gocql.Batch.Consistency func (b *fakeBatch) Consistency(gocql.Consistency) gocql.Batch { - return nil + return b } // fakeQuery is fake implementation of gocql.Query @@ -673,11 +678,7 @@ func TestReplicationTasks(t *testing.T) { for _, tc := range tests { t.Run(tc.desc, func(t *testing.T) { batch := &fakeBatch{} - err := createReplicationTasks(batch, tc.shardID, tc.domainID, tc.workflowID, tc.replTasks) - if err != nil { - t.Fatalf("createReplicationTasks failed: %v", err) - } - + createReplicationTasks(batch, tc.shardID, tc.domainID, tc.workflowID, tc.replTasks) if diff := cmp.Diff(tc.wantQueries, batch.queries); diff != "" { t.Fatalf("Query mismatch (-want +got):\n%s", diff) } @@ -755,11 +756,7 @@ func TestTransferTasks(t *testing.T) { for _, tc := range tests { t.Run(tc.desc, func(t *testing.T) { batch := &fakeBatch{} - err := createTransferTasks(batch, tc.shardID, tc.domainID, tc.workflowID, tc.transferTasks) - if err != nil { - t.Fatalf("createTransferTasks failed: %v", err) - } - + createTransferTasks(batch, tc.shardID, tc.domainID, tc.workflowID, tc.transferTasks) if diff := cmp.Diff(tc.wantQueries, batch.queries); diff != "" { t.Fatalf("Query mismatch (-want +got):\n%s", diff) } @@ -819,11 +816,7 @@ func TestCrossClusterTasks(t *testing.T) { for _, tc := range tests { t.Run(tc.desc, func(t *testing.T) { batch := &fakeBatch{} - err := createCrossClusterTasks(batch, tc.shardID, tc.domainID, tc.workflowID, tc.xClusterTasks) - if err != nil { - t.Fatalf("createCrossClusterTasks failed: %v", err) - } - + createCrossClusterTasks(batch, tc.shardID, tc.domainID, tc.workflowID, tc.xClusterTasks) if diff := cmp.Diff(tc.wantQueries, batch.queries); diff != "" { t.Fatalf("Query mismatch (-want +got):\n%s", diff) } @@ -2365,6 +2358,7 @@ func TestCreateOrUpdateWorkflowExecution(t *testing.T) { Condition: &nosqlplugin.CurrentWorkflowWriteCondition{ CurrentRunID: common.StringPtr("runid1"), LastWriteVersion: common.Int64Ptr(1), + State: common.IntPtr(persistence.WorkflowStateCreated), }, Row: nosqlplugin.CurrentWorkflowRow{ RunID: "runid1", @@ -2381,7 +2375,7 @@ func TestCreateOrUpdateWorkflowExecution(t *testing.T) { `WHERE ` + `shard_id = 1000 and type = 1 and domain_id = domain1 and workflow_id = workflow1 and ` + `run_id = 30000000-0000-f000-f000-000000000001 and visibility_ts = 946684800000 and task_id = -10 ` + - `IF current_run_id = runid1 `, + `IF current_run_id = runid1 and workflow_last_write_version = 1 and workflow_state = 0 `, }, }, {