diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index b939d836964..90899174a22 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -882,21 +882,15 @@ func (s *VtctldServer) GetSchema(ctx context.Context, req *vtctldatapb.GetSchema defer span.Finish() span.Annotate("tablet_alias", topoproto.TabletAliasString(req.TabletAlias)) - - tablet, err := s.ts.GetTablet(ctx, req.TabletAlias) - if err != nil { - return nil, fmt.Errorf("GetTablet(%v) failed: %w", req.TabletAlias, err) - } - span.Annotate("tables", strings.Join(req.Tables, ",")) span.Annotate("exclude_tables", strings.Join(req.ExcludeTables, ",")) span.Annotate("include_views", req.IncludeViews) span.Annotate("table_names_only", req.TableNamesOnly) span.Annotate("table_sizes_only", req.TableSizesOnly) - sd, err := s.tmc.GetSchema(ctx, tablet.Tablet, req.Tables, req.ExcludeTables, req.IncludeViews) + sd, err := schematools.GetSchema(ctx, s.ts, s.tmc, req.TabletAlias, req.Tables, req.ExcludeTables, req.IncludeViews) if err != nil { - return nil, fmt.Errorf("GetSchema(%v, %v, %v, %v) failed: %w", tablet.Tablet, req.Tables, req.ExcludeTables, req.IncludeViews, err) + return nil, err } if req.TableNamesOnly { diff --git a/go/vt/vtctl/schematools/copy.go b/go/vt/vtctl/schematools/copy.go new file mode 100644 index 00000000000..7fb9662235f --- /dev/null +++ b/go/vt/vtctl/schematools/copy.go @@ -0,0 +1,91 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schematools + +import ( + "bytes" + "context" + "fmt" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +// CopyShardMetadata copies the contents of the _vt.shard_metadata table from +// the source tablet to the destination tablet. +// +// NOTE: This function assumes that the destination tablet is a primary with +// binary logging enabled, in order to propagate the INSERT statements to any +// replicas in the destination shard. +func CopyShardMetadata(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, source *topodatapb.TabletAlias, dest *topodatapb.TabletAlias) error { + sourceTablet, err := ts.GetTablet(ctx, source) + if err != nil { + return fmt.Errorf("GetTablet(%v) failed: %w", topoproto.TabletAliasString(source), err) + } + + destTablet, err := ts.GetTablet(ctx, dest) + if err != nil { + return fmt.Errorf("GetTablet(%v) failed: %w", topoproto.TabletAliasString(dest), err) + } + + sql := "SELECT 1 FROM information_schema.tables WHERE table_schema = '_vt' AND table_name = 'shard_metadata'" + presenceResult, err := tmc.ExecuteFetchAsDba(ctx, sourceTablet.Tablet, false, []byte(sql), 1, false, false) + if err != nil { + return fmt.Errorf("ExecuteFetchAsDba(%v, false, %v, 1, false, false) failed: %v", topoproto.TabletAliasString(source), sql, err) + } + if len(presenceResult.Rows) == 0 { + log.Infof("_vt.shard_metadata doesn't exist on the source tablet %v, skipping its copy.", topoproto.TabletAliasString(source)) + return nil + } + + // (TODO|@ajm188,@deepthi): 100 may be too low here for row limit + sql = "SELECT db_name, name, value FROM _vt.shard_metadata" + p3qr, err := tmc.ExecuteFetchAsDba(ctx, sourceTablet.Tablet, false, []byte(sql), 100, false, false) + if err != nil { + return fmt.Errorf("ExecuteFetchAsDba(%v, false, %v, 100, false, false) failed: %v", topoproto.TabletAliasString(source), sql, err) + } + + qr := sqltypes.Proto3ToResult(p3qr) + queryBuf := bytes.NewBuffer(nil) + for _, row := range qr.Rows { + dbName := row[0] + name := row[1] + value := row[2] + queryBuf.WriteString("INSERT INTO _vt.shard_metadata (db_name, name, value) VALUES (") + dbName.EncodeSQL(queryBuf) + queryBuf.WriteByte(',') + name.EncodeSQL(queryBuf) + queryBuf.WriteByte(',') + value.EncodeSQL(queryBuf) + queryBuf.WriteString(") ON DUPLICATE KEY UPDATE value = ") + value.EncodeSQL(queryBuf) + + _, err := tmc.ExecuteFetchAsDba(ctx, destTablet.Tablet, false, queryBuf.Bytes(), 0, false, false) + if err != nil { + return fmt.Errorf("ExecuteFetchAsDba(%v, false, %v, 0, false, false) failed: %v", topoproto.TabletAliasString(dest), queryBuf.String(), err) + } + + queryBuf.Reset() + } + + return nil +} diff --git a/go/vt/vtctl/schematools/diff.go b/go/vt/vtctl/schematools/diff.go new file mode 100644 index 00000000000..2d3c8bf018e --- /dev/null +++ b/go/vt/vtctl/schematools/diff.go @@ -0,0 +1,55 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schematools + +import ( + "context" + "fmt" + + "vitess.io/vitess/go/vt/mysqlctl/tmutils" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +// CompareSchemas returns (nil, nil) if the schema of the two tablets match. If +// there are diffs, they are returned as (diffs []string, nil). +// +// If fetching the schema for either tablet fails, a non-nil error is returned. +func CompareSchemas( + ctx context.Context, + ts *topo.Server, + tmc tmclient.TabletManagerClient, + source *topodatapb.TabletAlias, + dest *topodatapb.TabletAlias, + tables []string, + excludeTables []string, + includeViews bool, +) (diffs []string, err error) { + sourceSchema, err := GetSchema(ctx, ts, tmc, source, tables, excludeTables, includeViews) + if err != nil { + return nil, fmt.Errorf("failed to get schema from tablet %v. err: %v", source, err) + } + + destSchema, err := GetSchema(ctx, ts, tmc, dest, tables, excludeTables, includeViews) + if err != nil { + return nil, fmt.Errorf("failed to get schema from tablet %v. err: %v", dest, err) + } + + return tmutils.DiffSchemaToArray("source", sourceSchema, "dest", destSchema), nil +} diff --git a/go/vt/vtctl/schematools/schematools.go b/go/vt/vtctl/schematools/schematools.go new file mode 100644 index 00000000000..41e6f4c5dc5 --- /dev/null +++ b/go/vt/vtctl/schematools/schematools.go @@ -0,0 +1,45 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schematools + +import ( + "context" + + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" +) + +// GetSchema makes an RPC to get the schema from a remote tablet, after +// verifying a tablet with that alias exists in the topo. +func GetSchema(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, alias *topodatapb.TabletAlias, tables []string, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { + ti, err := ts.GetTablet(ctx, alias) + if err != nil { + return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "GetTablet(%v) failed: %v", alias, err) + } + + sd, err := tmc.GetSchema(ctx, ti.Tablet, tables, excludeTables, includeViews) + if err != nil { + return nil, vterrors.Wrapf(err, "GetSchema(%v, %v, %v, %v) failed: %v", ti.Tablet, tables, excludeTables, includeViews, err) + } + + return sd, nil +} diff --git a/go/vt/worker/multi_split_diff.go b/go/vt/worker/multi_split_diff.go index d1749302788..cf6a6cfb939 100644 --- a/go/vt/worker/multi_split_diff.go +++ b/go/vt/worker/multi_split_diff.go @@ -17,29 +17,28 @@ limitations under the License. package worker import ( + "context" "fmt" "html/template" "sort" "sync" "time" - "context" - - "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vttablet/queryservice" - "vitess.io/vitess/go/vt/vttablet/tabletconn" - + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vtctl/schematools" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/vindexes" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/tabletconn" "vitess.io/vitess/go/vt/wrangler" - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/binlog/binlogplayer" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vtgate/vindexes" + "vitess.io/vitess/go/vt/proto/vtrpc" ) // Scanners encapsulates a source and a destination. We create one of these per parallel runner. @@ -761,8 +760,8 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl go func(i int, destinationAlias *topodatapb.TabletAlias) { var err error shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - destinationSchemaDefinition, err := msdw.wr.GetSchema( - shortCtx, destinationAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) + destinationSchemaDefinition, err := schematools.GetSchema( + shortCtx, msdw.wr.TopoServer(), msdw.wr.TabletManagerClient(), destinationAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) cancel() if err != nil { msdw.markAsWillFail(rec, err) @@ -776,8 +775,8 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl go func() { var err error shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - sourceSchemaDefinition, err = msdw.wr.GetSchema( - shortCtx, msdw.sourceAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) + sourceSchemaDefinition, err = schematools.GetSchema( + shortCtx, msdw.wr.TopoServer(), msdw.wr.TabletManagerClient(), msdw.sourceAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) cancel() if err != nil { msdw.markAsWillFail(rec, err) diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go index f5ba41a35c9..dd658e6e9e8 100644 --- a/go/vt/worker/split_clone.go +++ b/go/vt/worker/split_clone.go @@ -17,24 +17,23 @@ limitations under the License. package worker import ( + "context" "fmt" "html/template" "strings" "sync" "time" - "context" - "vitess.io/vitess/go/event" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/discovery" - "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/throttler" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tabletconn" @@ -44,6 +43,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" ) // cloneType specifies whether it is a horizontal resharding or a vertical split. @@ -1320,7 +1320,7 @@ func (scw *SplitCloneWorker) getSourceSchema(ctx context.Context, tablet *topoda // in each source shard for each table to be about the same // (rowCount is used to estimate an ETA) shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - sourceSchemaDefinition, err := scw.wr.GetSchema(shortCtx, tablet.Alias, scw.tables, scw.excludeTables, false /* includeViews */) + sourceSchemaDefinition, err := schematools.GetSchema(shortCtx, scw.wr.TopoServer(), scw.wr.TabletManagerClient(), tablet.Alias, scw.tables, scw.excludeTables, false /* includeViews */) cancel() if err != nil { return nil, vterrors.Wrapf(err, "cannot get schema from source %v", topoproto.TabletAliasString(tablet.Alias)) diff --git a/go/vt/worker/split_diff.go b/go/vt/worker/split_diff.go index 14ce0377c7f..47d7cd756d6 100644 --- a/go/vt/worker/split_diff.go +++ b/go/vt/worker/split_diff.go @@ -17,15 +17,11 @@ limitations under the License. package worker import ( + "context" "html/template" "sort" "sync" - "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vterrors" - - "context" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -33,11 +29,14 @@ import ( "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vtctl/schematools" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/wrangler" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" ) // SplitDiffWorker executes a diff between a destination shard and its @@ -404,8 +403,8 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error { go func() { var err error shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - sdw.destinationSchemaDefinition, err = sdw.wr.GetSchema( - shortCtx, sdw.destinationAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */) + sdw.destinationSchemaDefinition, err = schematools.GetSchema( + shortCtx, sdw.wr.TopoServer(), sdw.wr.TabletManagerClient(), sdw.destinationAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */) cancel() if err != nil { sdw.markAsWillFail(rec, err) @@ -417,8 +416,8 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error { go func() { var err error shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - sdw.sourceSchemaDefinition, err = sdw.wr.GetSchema( - shortCtx, sdw.sourceAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */) + sdw.sourceSchemaDefinition, err = schematools.GetSchema( + shortCtx, sdw.wr.TopoServer(), sdw.wr.TabletManagerClient(), sdw.sourceAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */) cancel() if err != nil { sdw.markAsWillFail(rec, err) diff --git a/go/vt/worker/vertical_split_diff.go b/go/vt/worker/vertical_split_diff.go index b857ca55126..167acfaadaf 100644 --- a/go/vt/worker/vertical_split_diff.go +++ b/go/vt/worker/vertical_split_diff.go @@ -17,14 +17,11 @@ limitations under the License. package worker import ( + "context" "fmt" "html/template" "sync" - "vitess.io/vitess/go/vt/vterrors" - - "context" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -32,6 +29,8 @@ import ( "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vtctl/schematools" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/wrangler" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" @@ -366,8 +365,8 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error { go func() { var err error shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - vsdw.destinationSchemaDefinition, err = vsdw.wr.GetSchema( - shortCtx, vsdw.destinationAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */) + vsdw.destinationSchemaDefinition, err = schematools.GetSchema( + shortCtx, vsdw.wr.TopoServer(), vsdw.wr.TabletManagerClient(), vsdw.destinationAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */) cancel() if err != nil { vsdw.markAsWillFail(rec, err) @@ -379,8 +378,8 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error { go func() { var err error shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - vsdw.sourceSchemaDefinition, err = vsdw.wr.GetSchema( - shortCtx, vsdw.sourceAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */) + vsdw.sourceSchemaDefinition, err = schematools.GetSchema( + shortCtx, vsdw.wr.TopoServer(), vsdw.wr.TabletManagerClient(), vsdw.sourceAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */) cancel() if err != nil { vsdw.markAsWillFail(rec, err) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index f30da0cd79d..6fefdafea45 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -17,6 +17,7 @@ limitations under the License. package wrangler import ( + "context" "fmt" "hash/fnv" "math" @@ -27,29 +28,28 @@ import ( "google.golang.org/protobuf/proto" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/mysqlctl/tmutils" - querypb "vitess.io/vitess/go/vt/proto/query" - "vitess.io/vitess/go/vt/schema" - "vitess.io/vitess/go/vt/topotools" - "vitess.io/vitess/go/vt/vtctl/workflow" - "vitess.io/vitess/go/vt/vtgate/evalengine" - - "context" - "vitess.io/vitess/go/json2" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/key" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - vschemapb "vitess.io/vitess/go/vt/proto/vschema" - vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/mysqlctl/tmutils" + "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vtctl/schematools" + "vitess.io/vitess/go/vt/vtctl/workflow" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) type materializer struct { @@ -525,7 +525,7 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp if onesource.PrimaryAlias == nil { return nil, nil, nil, fmt.Errorf("source shard has no primary: %v", onesource.ShardName()) } - tableSchema, err := wr.GetSchema(ctx, onesource.PrimaryAlias, []string{sourceTableName}, nil, false) + tableSchema, err := schematools.GetSchema(ctx, wr.ts, wr.tmc, onesource.PrimaryAlias, []string{sourceTableName}, nil, false) if err != nil { return nil, nil, nil, err } @@ -922,7 +922,7 @@ func (mz *materializer) deploySchema(ctx context.Context) error { allTables := []string{"/.*/"} hasTargetTable := map[string]bool{} - targetSchema, err := mz.wr.GetSchema(ctx, target.PrimaryAlias, allTables, nil, false) + targetSchema, err := schematools.GetSchema(ctx, mz.wr.ts, mz.wr.tmc, target.PrimaryAlias, allTables, nil, false) if err != nil { return err } diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go index fb2bacef432..4a77738f154 100644 --- a/go/vt/wrangler/schema.go +++ b/go/vt/wrangler/schema.go @@ -18,15 +18,13 @@ package wrangler import ( "bytes" + "context" "fmt" "html/template" "sort" "sync" "time" - "context" - - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" @@ -34,6 +32,7 @@ import ( "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vtctl/schematools" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -45,21 +44,11 @@ const ( DefaultWaitReplicasTimeout = 10 * time.Second ) -// GetSchema uses an RPC to get the schema from a remote tablet -func (wr *Wrangler) GetSchema(ctx context.Context, tabletAlias *topodatapb.TabletAlias, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { - ti, err := wr.ts.GetTablet(ctx, tabletAlias) - if err != nil { - return nil, fmt.Errorf("GetTablet(%v) failed: %v", tabletAlias, err) - } - - return wr.tmc.GetSchema(ctx, ti.Tablet, tables, excludeTables, includeViews) -} - // helper method to asynchronously diff a schema func (wr *Wrangler) diffSchema(ctx context.Context, primarySchema *tabletmanagerdatapb.SchemaDefinition, primaryTabletAlias, alias *topodatapb.TabletAlias, excludeTables []string, includeViews bool, wg *sync.WaitGroup, er concurrency.ErrorRecorder) { defer wg.Done() log.Infof("Gathering schema for %v", topoproto.TabletAliasString(alias)) - replicaSchema, err := wr.GetSchema(ctx, alias, nil, excludeTables, includeViews) + replicaSchema, err := schematools.GetSchema(ctx, wr.ts, wr.tmc, alias, nil, excludeTables, includeViews) if err != nil { er.RecordError(fmt.Errorf("GetSchema(%v, nil, %v, %v) failed: %v", alias, excludeTables, includeViews, err)) return @@ -81,7 +70,7 @@ func (wr *Wrangler) ValidateSchemaShard(ctx context.Context, keyspace, shard str return fmt.Errorf("no primary in shard %v/%v", keyspace, shard) } log.Infof("Gathering schema for primary %v", topoproto.TabletAliasString(si.PrimaryAlias)) - primarySchema, err := wr.GetSchema(ctx, si.PrimaryAlias, nil, excludeTables, includeViews) + primarySchema, err := schematools.GetSchema(ctx, wr.ts, wr.tmc, si.PrimaryAlias, nil, excludeTables, includeViews) if err != nil { return fmt.Errorf("GetSchema(%v, nil, %v, %v) failed: %v", si.PrimaryAlias, excludeTables, includeViews, err) } @@ -170,7 +159,7 @@ func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, if referenceSchema == nil { referenceAlias = si.PrimaryAlias log.Infof("Gathering schema for reference primary %v", topoproto.TabletAliasString(referenceAlias)) - referenceSchema, err = wr.GetSchema(ctx, referenceAlias, nil, excludeTables, includeViews) + referenceSchema, err = schematools.GetSchema(ctx, wr.ts, wr.tmc, referenceAlias, nil, excludeTables, includeViews) if err != nil { return fmt.Errorf("GetSchema(%v, nil, %v, %v) failed: %v", referenceAlias, excludeTables, includeViews, err) } @@ -218,7 +207,7 @@ func (wr *Wrangler) ValidateVSchema(ctx context.Context, keyspace string, shards shardFailures.RecordError(fmt.Errorf("GetShard(%v, %v) failed: %v", keyspace, shard, err)) return } - primarySchema, err := wr.GetSchema(ctx, si.PrimaryAlias, nil, excludeTables, includeViews) + primarySchema, err := schematools.GetSchema(ctx, wr.ts, wr.tmc, si.PrimaryAlias, nil, excludeTables, includeViews) if err != nil { shardFailures.RecordError(fmt.Errorf("GetSchema(%s, nil, %v, %v) (%v/%v) failed: %v", si.PrimaryAlias.String(), excludeTables, includeViews, keyspace, shard, err, @@ -284,12 +273,12 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topo return fmt.Errorf("no primary in shard record %v/%v. Consider running 'vtctl InitShardPrimary' in case of a new shard or reparenting the shard to fix the topology data", destKeyspace, destShard) } - err = wr.copyShardMetadata(ctx, sourceTabletAlias, destShardInfo.PrimaryAlias) + err = schematools.CopyShardMetadata(ctx, wr.ts, wr.tmc, sourceTabletAlias, destShardInfo.PrimaryAlias) if err != nil { return fmt.Errorf("copyShardMetadata(%v, %v) failed: %v", sourceTabletAlias, destShardInfo.PrimaryAlias, err) } - diffs, err := wr.compareSchemas(ctx, sourceTabletAlias, destShardInfo.PrimaryAlias, tables, excludeTables, includeViews) + diffs, err := schematools.CompareSchemas(ctx, wr.ts, wr.tmc, sourceTabletAlias, destShardInfo.PrimaryAlias, tables, excludeTables, includeViews) if err != nil { return fmt.Errorf("CopySchemaShard failed because schemas could not be compared initially: %v", err) } @@ -298,7 +287,7 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topo return nil } - sourceSd, err := wr.GetSchema(ctx, sourceTabletAlias, tables, excludeTables, includeViews) + sourceSd, err := schematools.GetSchema(ctx, wr.ts, wr.tmc, sourceTabletAlias, tables, excludeTables, includeViews) if err != nil { return fmt.Errorf("GetSchema(%v, %v, %v, %v) failed: %v", sourceTabletAlias, tables, excludeTables, includeViews, err) } @@ -330,7 +319,7 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topo // statement. We want to fail early in this case because vtworker SplitDiff // fails in case of such an inconsistency as well. if !skipVerify { - diffs, err = wr.compareSchemas(ctx, sourceTabletAlias, destShardInfo.PrimaryAlias, tables, excludeTables, includeViews) + diffs, err = schematools.CompareSchemas(ctx, wr.ts, wr.tmc, sourceTabletAlias, destShardInfo.PrimaryAlias, tables, excludeTables, includeViews) if err != nil { return fmt.Errorf("CopySchemaShard failed because schemas could not be compared finally: %v", err) } @@ -357,64 +346,6 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topo return err } -// copyShardMetadata copies contents of _vt.shard_metadata table from the source -// tablet to the destination tablet. It's assumed that destination tablet is a -// primary and binlogging is not turned off when INSERT statements are executed. -func (wr *Wrangler) copyShardMetadata(ctx context.Context, srcTabletAlias *topodatapb.TabletAlias, destTabletAlias *topodatapb.TabletAlias) error { - sql := "SELECT 1 FROM information_schema.tables WHERE table_schema = '_vt' AND table_name = 'shard_metadata'" - presenceResult, err := wr.ExecuteFetchAsDba(ctx, srcTabletAlias, sql, 1, false, false) - if err != nil { - return fmt.Errorf("ExecuteFetchAsDba(%v, %v, 1, false, false) failed: %v", srcTabletAlias, sql, err) - } - if len(presenceResult.Rows) == 0 { - log.Infof("_vt.shard_metadata doesn't exist on the source tablet %v, skipping its copy.", topoproto.TabletAliasString(srcTabletAlias)) - return nil - } - - // TODO: 100 may be too low here for row limit - sql = "SELECT db_name, name, value FROM _vt.shard_metadata" - dataProto, err := wr.ExecuteFetchAsDba(ctx, srcTabletAlias, sql, 100, false, false) - if err != nil { - return fmt.Errorf("ExecuteFetchAsDba(%v, %v, 100, false, false) failed: %v", srcTabletAlias, sql, err) - } - data := sqltypes.Proto3ToResult(dataProto) - for _, row := range data.Rows { - dbName := row[0] - name := row[1] - value := row[2] - queryBuf := bytes.Buffer{} - queryBuf.WriteString("INSERT INTO _vt.shard_metadata (db_name, name, value) VALUES (") - dbName.EncodeSQL(&queryBuf) - queryBuf.WriteByte(',') - name.EncodeSQL(&queryBuf) - queryBuf.WriteByte(',') - value.EncodeSQL(&queryBuf) - queryBuf.WriteString(") ON DUPLICATE KEY UPDATE value = ") - value.EncodeSQL(&queryBuf) - - _, err := wr.ExecuteFetchAsDba(ctx, destTabletAlias, queryBuf.String(), 0, false, false) - if err != nil { - return fmt.Errorf("ExecuteFetchAsDba(%v, %v, 0, false, false) failed: %v", destTabletAlias, queryBuf.String(), err) - } - } - return nil -} - -// compareSchemas returns nil if the schema of the two tablets referenced by -// "sourceAlias" and "destAlias" are identical. Otherwise, the difference is -// returned as []string. -func (wr *Wrangler) compareSchemas(ctx context.Context, sourceAlias, destAlias *topodatapb.TabletAlias, tables, excludeTables []string, includeViews bool) ([]string, error) { - sourceSd, err := wr.GetSchema(ctx, sourceAlias, tables, excludeTables, includeViews) - if err != nil { - return nil, fmt.Errorf("failed to get schema from tablet %v. err: %v", sourceAlias, err) - } - destSd, err := wr.GetSchema(ctx, destAlias, tables, excludeTables, includeViews) - if err != nil { - return nil, fmt.Errorf("failed to get schema from tablet %v. err: %v", destAlias, err) - } - return tmutils.DiffSchemaToArray("source", sourceSd, "dest", destSd), nil -} - // applySQLShard applies a given SQL change on a given tablet alias. It allows executing arbitrary // SQL statements, but doesn't return any results, so it's only useful for SQL statements // that would be run for their effects (e.g., CREATE). diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index d02b8423311..9d4359fb1b5 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -42,6 +42,7 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vtctl/workflow" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" @@ -217,7 +218,7 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflowName, sou oneFilter = bls.Filter break } - schm, err := wr.GetSchema(ctx, oneTarget.GetPrimary().Alias, nil, nil, false) + schm, err := schematools.GetSchema(ctx, wr.ts, wr.tmc, oneTarget.GetPrimary().Alias, nil, nil, false) if err != nil { return nil, vterrors.Wrap(err, "GetSchema") }