From ecbe83df11cc8687d25d0e2fa8bb0bbeb7e0af07 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 2 Nov 2023 19:40:35 -0400 Subject: [PATCH] Properly handle multiple streams in UpdateVReplicationWorkflow Signed-off-by: Matt Lord --- go/textutil/strings.go | 6 +- .../tabletmanager/rpc_vreplication.go | 133 +++++++++--------- .../tabletmanager/rpc_vreplication_test.go | 56 +++++--- 3 files changed, 110 insertions(+), 85 deletions(-) diff --git a/go/textutil/strings.go b/go/textutil/strings.go index bb2e24ba477..ac35541f52f 100644 --- a/go/textutil/strings.go +++ b/go/textutil/strings.go @@ -23,8 +23,8 @@ import ( "unicode" "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/proto/binlogdata" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -94,7 +94,7 @@ func ValueIsSimulatedNull(val any) bool { return cval == SimulatedNullString case []string: return len(cval) == 1 && cval[0] == sqltypes.NULL.String() - case binlogdata.OnDDLAction: + case binlogdatapb.OnDDLAction: return int32(cval) == int32(SimulatedNullInt) case int: return cval == SimulatedNullInt @@ -104,6 +104,8 @@ func ValueIsSimulatedNull(val any) bool { return int64(cval) == int64(SimulatedNullInt) case []topodatapb.TabletType: return len(cval) == 1 && cval[0] == topodatapb.TabletType(SimulatedNullInt) + case binlogdatapb.VReplicationWorkflowState: + return int32(cval) == int32(SimulatedNullInt) default: return false } diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication.go b/go/vt/vttablet/tabletmanager/rpc_vreplication.go index bbcea8bd0d6..b18caa1063f 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication.go @@ -45,10 +45,10 @@ const ( sqlReadVReplicationWorkflow = "select id, source, pos, stop_pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, message, db_name, rows_copied, tags, time_heartbeat, workflow_type, time_throttled, component_throttled, workflow_sub_type, defer_secondary_keys from %s.vreplication where workflow = %a and db_name = %a" // Delete VReplication records for the given workflow. sqlDeleteVReplicationWorkflow = "delete from %s.vreplication where workflow = %a and db_name = %a" - // Retrieve the current configuration values for a workflow's vreplication stream. + // Retrieve the current configuration values for a workflow's vreplication stream(s). sqlSelectVReplicationWorkflowConfig = "select id, source, cell, tablet_types, state, message from %s.vreplication where workflow = %a" // Update the configuration values for a workflow's vreplication stream. - sqlUpdateVReplicationWorkflowConfig = "update %s.vreplication set state = %a, source = %a, cell = %a, tablet_types = %a where id = %a" + sqlUpdateVReplicationWorkflowStreamConfig = "update %s.vreplication set state = %a, source = %a, cell = %a, tablet_types = %a where id = %a" ) func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) { @@ -228,8 +228,8 @@ func (tm *TabletManager) ReadVReplicationWorkflow(ctx context.Context, req *tabl } // UpdateVReplicationWorkflow updates the sidecar databases's vreplication -// record for this tablet's vreplication workflow stream(s). If there -// is no stream for the given workflow on the tablet then a nil result +// record(s) for this tablet's vreplication workflow stream(s). If there +// are no streams for the given workflow on the tablet then a nil result // is returned as this is expected e.g. on source tablets of a // Reshard workflow (source and target are the same keyspace). The // caller can consider this case an error if they choose to. @@ -257,68 +257,73 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{Result: nil}, nil } - row := res.Named().Row() - id := row.AsInt64("id", 0) - cells := strings.Split(row.AsString("cell", ""), ",") - tabletTypes, inorder, err := discovery.ParseTabletTypesAndOrder(row.AsString("tablet_types", "")) - if err != nil { - return nil, err - } - bls := &binlogdatapb.BinlogSource{} - source := row.AsBytes("source", []byte{}) - state := row.AsString("state", "") - message := row.AsString("message", "") - if req.State == binlogdatapb.VReplicationWorkflowState_Running && strings.ToUpper(message) == workflow.Frozen { - return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{Result: nil}, - vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "cannot start a workflow when it is frozen") - } - // For the string based values, we use NULL to differentiate - // from an empty string. The NULL value indicates that we - // should keep the existing value. - if !textutil.ValueIsSimulatedNull(req.Cells) { - cells = req.Cells - } - if !textutil.ValueIsSimulatedNull(req.TabletTypes) { - tabletTypes = req.TabletTypes - } - tabletTypesStr := topoproto.MakeStringTypeCSV(tabletTypes) - if inorder && req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_UNKNOWN || - req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_INORDER { - tabletTypesStr = discovery.InOrderHint + tabletTypesStr - } - if err = prototext.Unmarshal(source, bls); err != nil { - return nil, err - } - // If we don't want to update the existing value then pass - // the simulated NULL value of -1. - if !textutil.ValueIsSimulatedNull(req.OnDdl) { - bls.OnDdl = req.OnDdl - } - source, err = prototext.Marshal(bls) - if err != nil { - return nil, err - } - if !textutil.ValueIsSimulatedNull(req.State) { - state = binlogdatapb.VReplicationWorkflowState_name[int32(req.State)] - } - bindVars = map[string]*querypb.BindVariable{ - "st": sqltypes.StringBindVariable(state), - "sc": sqltypes.StringBindVariable(string(source)), - "cl": sqltypes.StringBindVariable(strings.Join(cells, ",")), - "tt": sqltypes.StringBindVariable(tabletTypesStr), - "id": sqltypes.Int64BindVariable(id), - } - parsed = sqlparser.BuildParsedQuery(sqlUpdateVReplicationWorkflowConfig, sidecar.GetIdentifier(), ":st", ":sc", ":cl", ":tt", ":id") - stmt, err = parsed.GenerateQuery(bindVars, nil) - if err != nil { - return nil, err + for _, row := range res.Named().Rows { + id := row.AsInt64("id", 0) + cells := strings.Split(row.AsString("cell", ""), ",") + tabletTypes, inorder, err := discovery.ParseTabletTypesAndOrder(row.AsString("tablet_types", "")) + if err != nil { + return nil, err + } + bls := &binlogdatapb.BinlogSource{} + source := row.AsBytes("source", []byte{}) + state := row.AsString("state", "") + message := row.AsString("message", "") + if req.State == binlogdatapb.VReplicationWorkflowState_Running && strings.ToUpper(message) == workflow.Frozen { + return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{Result: nil}, + vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, "cannot start a workflow when it is frozen") + } + // For the string based values, we use NULL to differentiate + // from an empty string. The NULL value indicates that we + // should keep the existing value. + if !textutil.ValueIsSimulatedNull(req.Cells) { + cells = req.Cells + } + if !textutil.ValueIsSimulatedNull(req.TabletTypes) { + tabletTypes = req.TabletTypes + } + tabletTypesStr := topoproto.MakeStringTypeCSV(tabletTypes) + if inorder && req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_UNKNOWN || + req.TabletSelectionPreference == tabletmanagerdatapb.TabletSelectionPreference_INORDER { + tabletTypesStr = discovery.InOrderHint + tabletTypesStr + } + if err = prototext.Unmarshal(source, bls); err != nil { + return nil, err + } + // If we don't want to update the existing value then pass + // the simulated NULL value of -1. + if !textutil.ValueIsSimulatedNull(req.OnDdl) { + bls.OnDdl = req.OnDdl + } + source, err = prototext.Marshal(bls) + if err != nil { + return nil, err + } + if !textutil.ValueIsSimulatedNull(req.State) { + state = binlogdatapb.VReplicationWorkflowState_name[int32(req.State)] + } + bindVars = map[string]*querypb.BindVariable{ + "st": sqltypes.StringBindVariable(state), + "sc": sqltypes.StringBindVariable(string(source)), + "cl": sqltypes.StringBindVariable(strings.Join(cells, ",")), + "tt": sqltypes.StringBindVariable(tabletTypesStr), + "id": sqltypes.Int64BindVariable(id), + } + parsed = sqlparser.BuildParsedQuery(sqlUpdateVReplicationWorkflowStreamConfig, sidecar.GetIdentifier(), ":st", ":sc", ":cl", ":tt", ":id") + stmt, err = parsed.GenerateQuery(bindVars, nil) + if err != nil { + return nil, err + } + res, err = tm.VREngine.Exec(stmt) + if err != nil { + return nil, err + } } - res, err = tm.VREngine.Exec(stmt) - if err != nil { - return nil, err - } - return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil + return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{ + Result: &querypb.QueryResult{ + RowsAffected: uint64(len(res.Rows)), + }, + }, nil } // VReplicationExec executes a vreplication command. diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 1f985733371..024a6eb1955 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -479,13 +479,14 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { keyspace, shard) selectRes := sqltypes.MakeTestResult( sqltypes.MakeTestFields( - "id|source|cell|tablet_types", - "int64|varchar|varchar|varchar", + "id|source|cell|tablet_types|state|message", + "int64|varchar|varchar|varchar|varchar|varbinary", ), - fmt.Sprintf("%d|%s|%s|%s", vreplID, blsStr, cells[0], tabletTypes[0]), + fmt.Sprintf("%d|%s|%s|%s|Running|", vreplID, blsStr, cells[0], tabletTypes[0]), + fmt.Sprintf("%d|%s|%s|%s|Running|", vreplID+1, blsStr, cells[0], tabletTypes[0]), ) - idQuery, err := sqlparser.ParseAndBind("select id from _vt.vreplication where id = %a", - sqltypes.Int64BindVariable(int64(vreplID))) + idQuery, err := sqlparser.ParseAndBind("select id from _vt.vreplication where workflow = %a", + sqltypes.StringBindVariable(workflow)) require.NoError(t, err) idRes := sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -493,6 +494,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { "int64", ), fmt.Sprintf("%d", vreplID), + fmt.Sprintf("%d", vreplID+1), ) tests := []struct { @@ -504,61 +506,79 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { name: "update cells", request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ Workflow: workflow, + State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt), Cells: []string{"zone2"}, // TabletTypes is an empty value, so the current value should be cleared }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '' where id in (%d)`, - keyspace, shard, "zone2", vreplID), + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '' where id in (%s)`, + keyspace, shard, "zone2", fmt.Sprintf("%d, %d", vreplID, vreplID+1)), }, { name: "update cells, NULL tablet_types", request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ Workflow: workflow, + State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt), Cells: []string{"zone3"}, TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}, // So keep the current value of replica }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, - keyspace, shard, "zone3", tabletTypes[0], vreplID), + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%s)`, + keyspace, shard, "zone3", tabletTypes[0], fmt.Sprintf("%d, %d", vreplID, vreplID+1)), }, { name: "update tablet_types", request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ Workflow: workflow, + State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt), TabletSelectionPreference: tabletmanagerdatapb.TabletSelectionPreference_INORDER, TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA}, }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '', tablet_types = '%s' where id in (%d)`, - keyspace, shard, "in_order:rdonly,replica", vreplID), + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '', tablet_types = '%s' where id in (%s)`, + keyspace, shard, "in_order:rdonly,replica", fmt.Sprintf("%d, %d", vreplID, vreplID+1)), }, { name: "update tablet_types, NULL cells", request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ Workflow: workflow, + State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt), Cells: textutil.SimulatedNullStringSlice, // So keep the current value of zone1 TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, - keyspace, shard, cells[0], "rdonly", vreplID), + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%s)`, + keyspace, shard, cells[0], "rdonly", fmt.Sprintf("%d, %d", vreplID, vreplID+1)), }, { name: "update on_ddl", request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ Workflow: workflow, + State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt), OnDdl: binlogdatapb.OnDDLAction_EXEC, }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%d)`, - keyspace, shard, binlogdatapb.OnDDLAction_EXEC.String(), vreplID), + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%s)`, + keyspace, shard, binlogdatapb.OnDDLAction_EXEC.String(), fmt.Sprintf("%d, %d", vreplID, vreplID+1)), }, { name: "update cell,tablet_types,on_ddl", request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ Workflow: workflow, + State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt), Cells: []string{"zone1", "zone2", "zone3"}, TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_PRIMARY}, OnDdl: binlogdatapb.OnDDLAction_EXEC_IGNORE, }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Stopped', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%d)`, - keyspace, shard, binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), "zone1,zone2,zone3", "rdonly,replica,primary", vreplID), + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%s)`, + keyspace, shard, binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), "zone1,zone2,zone3", "rdonly,replica,primary", fmt.Sprintf("%d, %d", vreplID, vreplID+1)), + }, + { + name: "update state", + request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ + Workflow: workflow, + State: binlogdatapb.VReplicationWorkflowState_Stopped, + Cells: textutil.SimulatedNullStringSlice, + TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}, + OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt), + }, + query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%s)`, + binlogdatapb.VReplicationWorkflowState_Stopped.String(), keyspace, shard, cells[0], tabletTypes[0], fmt.Sprintf("%d, %d", vreplID, vreplID+1)), }, } @@ -575,8 +595,6 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { require.NotNil(t, tt.request, "No request provided") require.NotEqual(t, "", tt.query, "No expected query provided") - tt.request.State = binlogdatapb.VReplicationWorkflowState_Stopped - // These are the same for each RPC call. tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil) tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(selectQuery, selectRes, nil)