diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 37ad61922ce..a7a1958a9eb 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -2153,7 +2153,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla case vReplicationWorkflowActionProgress: return printCopyProgress() case vReplicationWorkflowActionCreate: - err = wf.Create() + err = wf.Create(ctx) if err != nil { return err } @@ -2783,7 +2783,7 @@ func commandValidateSchemaShard(ctx context.Context, wr *wrangler.Wrangler, subF if *excludeTables != "" { excludeTableArray = strings.Split(*excludeTables, ",") } - return wr.ValidateSchemaShard(ctx, keyspace, shard, excludeTableArray, *includeViews) + return wr.ValidateSchemaShard(ctx, keyspace, shard, excludeTableArray, *includeViews, false /*includeVSchema*/) } func commandValidateSchemaKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { diff --git a/go/vt/vtctld/vtctld.go b/go/vt/vtctld/vtctld.go index 784f707c0e5..e60a05f412e 100644 --- a/go/vt/vtctld/vtctld.go +++ b/go/vt/vtctld/vtctld.go @@ -81,7 +81,7 @@ func InitVtctld(ts *topo.Server) { actionRepo.RegisterShardAction("ValidateSchemaShard", func(ctx context.Context, wr *wrangler.Wrangler, keyspace, shard string) (string, error) { - return "", wr.ValidateSchemaShard(ctx, keyspace, shard, nil, false) + return "", wr.ValidateSchemaShard(ctx, keyspace, shard, nil, false, false /*includeVSchema*/) }) actionRepo.RegisterShardAction("ValidateVersionShard", diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go index c68741430d9..f1c8e9573f7 100644 --- a/go/vt/wrangler/schema.go +++ b/go/vt/wrangler/schema.go @@ -143,7 +143,7 @@ func (wr *Wrangler) diffSchema(ctx context.Context, masterSchema *tabletmanagerd } // ValidateSchemaShard will diff the schema from all the tablets in the shard. -func (wr *Wrangler) ValidateSchemaShard(ctx context.Context, keyspace, shard string, excludeTables []string, includeViews bool) error { +func (wr *Wrangler) ValidateSchemaShard(ctx context.Context, keyspace, shard string, excludeTables []string, includeViews bool, includeVSchema bool) error { si, err := wr.ts.GetShard(ctx, keyspace, shard) if err != nil { return fmt.Errorf("GetShard(%v, %v) failed: %v", keyspace, shard, err) @@ -159,6 +159,24 @@ func (wr *Wrangler) ValidateSchemaShard(ctx context.Context, keyspace, shard str return fmt.Errorf("GetSchema(%v, nil, %v, %v) failed: %v", si.MasterAlias, excludeTables, includeViews, err) } + if includeVSchema { + vschm, err := wr.ts.GetVSchema(ctx, keyspace) + if err != nil { + return fmt.Errorf("GetVSchema(%s) failed: %v", keyspace, err) + } + notFoundTables := []string{} + + for _, tableDef := range masterSchema.TableDefinitions { + if _, ok := vschm.Tables[tableDef.Name]; !ok { + notFoundTables = append(notFoundTables, tableDef.Name) + } + } + + if len(notFoundTables) > 0 { + return fmt.Errorf("Vschema Validation Failed: the following tables were not found in the vschema %v", notFoundTables) + } + } + // read all the aliases in the shard, that is all tablets that are // replicating from the master aliases, err := wr.ts.FindAllTabletAliasesInShard(ctx, keyspace, shard) @@ -199,7 +217,7 @@ func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, } sort.Strings(shards) if len(shards) == 1 { - return wr.ValidateSchemaShard(ctx, keyspace, shards[0], excludeTables, includeViews) + return wr.ValidateSchemaShard(ctx, keyspace, shards[0], excludeTables, includeViews, false /*includeVSchema*/) } var referenceSchema *tabletmanagerdatapb.SchemaDefinition diff --git a/go/vt/wrangler/schema_test.go b/go/vt/wrangler/schema_test.go new file mode 100644 index 00000000000..a82e2ecd056 --- /dev/null +++ b/go/vt/wrangler/schema_test.go @@ -0,0 +1,75 @@ +/* +Copyright 2020 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wrangler + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" +) + +func TestValidateSchemaShard(t *testing.T) { + ctx := context.Background() + sourceShards := []string{"-80", "80-"} + targetShards := []string{"-40", "40-80", "80-c0", "c0-"} + + tme := newTestShardMigrater(ctx, t, sourceShards, targetShards) + + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "not_in_vschema", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + + // This is the vschema returned by newTestShardMigrater + schm2 := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: "t1", + Columns: []string{"c1"}, + }, + { + Name: "t2", + Columns: []string{"c1"}, + }, + { + Name: "t3", + Columns: []string{"c1"}, + }, + }, + } + + for _, primary := range tme.sourceMasters { + if primary.Tablet.Shard == "80-" { + primary.FakeMysqlDaemon.Schema = schm + } else { + primary.FakeMysqlDaemon.Schema = schm2 + } + } + + err := tme.wr.ValidateSchemaShard(ctx, "ks", "-80", nil /*excludeTables*/, true /*includeViews*/, true /*includeVSchema*/) + require.NoError(t, err) + shouldErr := tme.wr.ValidateSchemaShard(ctx, "ks", "80-", nil /*excludeTables*/, true /*includeViews*/, true /*includeVSchema*/) + require.Contains(t, shouldErr.Error(), "Vschema Validation Failed:") +} diff --git a/go/vt/wrangler/workflow.go b/go/vt/wrangler/workflow.go index c08a6e50ac8..2c2e57be595 100644 --- a/go/vt/wrangler/workflow.go +++ b/go/vt/wrangler/workflow.go @@ -162,7 +162,7 @@ func (vrw *VReplicationWorkflow) stateAsString(ws *workflowState) string { } // Create initiates a workflow -func (vrw *VReplicationWorkflow) Create() error { +func (vrw *VReplicationWorkflow) Create(ctx context.Context) error { var err error if vrw.Exists() { return fmt.Errorf("workflow already exists") @@ -174,6 +174,22 @@ func (vrw *VReplicationWorkflow) Create() error { case MoveTablesWorkflow, MigrateWorkflow: err = vrw.initMoveTables() case ReshardWorkflow: + excludeTables := strings.Split(vrw.params.ExcludeTables, ",") + keyspace := vrw.params.SourceKeyspace + + errs := []string{} + for _, shard := range vrw.params.SourceShards { + if err := vrw.wr.ValidateSchemaShard(ctx, keyspace, shard, excludeTables, true /*includeViews*/, true /*includeVschema*/); err != nil { + errMsg := fmt.Sprintf("%s/%s: %s", keyspace, shard, err.Error()) + errs = append(errs, errMsg) + } + } + + // There were some schema drifts + if len(errs) > 0 { + return fmt.Errorf("Create ReshardWorkflow failed Schema Validation:\n" + strings.Join(errs, "\n")) + } + err = vrw.initReshard() default: return fmt.Errorf("unknown workflow type %d", vrw.workflowType) diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index 849ffffb890..521e3f5e882 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" "vitess.io/vitess/go/vt/proto/topodata" ) @@ -383,6 +384,42 @@ func TestReshardV2(t *testing.T) { require.NotNil(t, si) } +func TestVRWSchemaValidation(t *testing.T) { + ctx := context.Background() + sourceShards := []string{"-80", "80-"} + targetShards := []string{"-40", "40-80", "80-c0", "c0-"} + p := &VReplicationWorkflowParams{ + Workflow: "test", + SourceKeyspace: "ks", + TargetKeyspace: "ks", + SourceShards: sourceShards, + TargetShards: targetShards, + Cells: "cell1,cell2", + TabletTypes: "replica,rdonly,master", + Timeout: DefaultActionTimeout, + } + schm := &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Name: "not_in_vschema", + Columns: []string{"c1", "c2"}, + PrimaryKeyColumns: []string{"c1"}, + Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"), + }}, + } + tme := newTestShardMigrater(ctx, t, sourceShards, targetShards) + for _, primary := range tme.sourceMasters { + primary.FakeMysqlDaemon.Schema = schm + } + + defer tme.stopTablets(t) + vrwf, err := tme.wr.NewVReplicationWorkflow(ctx, ReshardWorkflow, p) + vrwf.ws = nil + require.NoError(t, err) + require.NotNil(t, vrwf) + shouldErr := vrwf.Create(ctx) + require.Contains(t, shouldErr.Error(), "Create ReshardWorkflow failed Schema Validation") +} + func TestReshardV2Cancel(t *testing.T) { ctx := context.Background() sourceShards := []string{"-40", "40-"}