From e17782a5d2d4325f7afa97ca9bc29cfeec4d11ac Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Wed, 27 Oct 2021 06:54:49 -0400 Subject: [PATCH 1/4] Move wr.copySchemaShard to schematools to share with vtctldserver Signed-off-by: Andrew Mason --- go/vt/vtctl/schematools/copy.go | 91 +++++++++++++++++++++++++++++++++ go/vt/wrangler/schema.go | 47 +---------------- 2 files changed, 93 insertions(+), 45 deletions(-) create mode 100644 go/vt/vtctl/schematools/copy.go diff --git a/go/vt/vtctl/schematools/copy.go b/go/vt/vtctl/schematools/copy.go new file mode 100644 index 00000000000..7fb9662235f --- /dev/null +++ b/go/vt/vtctl/schematools/copy.go @@ -0,0 +1,91 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schematools + +import ( + "bytes" + "context" + "fmt" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +// CopyShardMetadata copies the contents of the _vt.shard_metadata table from +// the source tablet to the destination tablet. +// +// NOTE: This function assumes that the destination tablet is a primary with +// binary logging enabled, in order to propagate the INSERT statements to any +// replicas in the destination shard. +func CopyShardMetadata(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, source *topodatapb.TabletAlias, dest *topodatapb.TabletAlias) error { + sourceTablet, err := ts.GetTablet(ctx, source) + if err != nil { + return fmt.Errorf("GetTablet(%v) failed: %w", topoproto.TabletAliasString(source), err) + } + + destTablet, err := ts.GetTablet(ctx, dest) + if err != nil { + return fmt.Errorf("GetTablet(%v) failed: %w", topoproto.TabletAliasString(dest), err) + } + + sql := "SELECT 1 FROM information_schema.tables WHERE table_schema = '_vt' AND table_name = 'shard_metadata'" + presenceResult, err := tmc.ExecuteFetchAsDba(ctx, sourceTablet.Tablet, false, []byte(sql), 1, false, false) + if err != nil { + return fmt.Errorf("ExecuteFetchAsDba(%v, false, %v, 1, false, false) failed: %v", topoproto.TabletAliasString(source), sql, err) + } + if len(presenceResult.Rows) == 0 { + log.Infof("_vt.shard_metadata doesn't exist on the source tablet %v, skipping its copy.", topoproto.TabletAliasString(source)) + return nil + } + + // (TODO|@ajm188,@deepthi): 100 may be too low here for row limit + sql = "SELECT db_name, name, value FROM _vt.shard_metadata" + p3qr, err := tmc.ExecuteFetchAsDba(ctx, sourceTablet.Tablet, false, []byte(sql), 100, false, false) + if err != nil { + return fmt.Errorf("ExecuteFetchAsDba(%v, false, %v, 100, false, false) failed: %v", topoproto.TabletAliasString(source), sql, err) + } + + qr := sqltypes.Proto3ToResult(p3qr) + queryBuf := bytes.NewBuffer(nil) + for _, row := range qr.Rows { + dbName := row[0] + name := row[1] + value := row[2] + queryBuf.WriteString("INSERT INTO _vt.shard_metadata (db_name, name, value) VALUES (") + dbName.EncodeSQL(queryBuf) + queryBuf.WriteByte(',') + name.EncodeSQL(queryBuf) + queryBuf.WriteByte(',') + value.EncodeSQL(queryBuf) + queryBuf.WriteString(") ON DUPLICATE KEY UPDATE value = ") + value.EncodeSQL(queryBuf) + + _, err := tmc.ExecuteFetchAsDba(ctx, destTablet.Tablet, false, queryBuf.Bytes(), 0, false, false) + if err != nil { + return fmt.Errorf("ExecuteFetchAsDba(%v, false, %v, 0, false, false) failed: %v", topoproto.TabletAliasString(dest), queryBuf.String(), err) + } + + queryBuf.Reset() + } + + return nil +} diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go index fb2bacef432..615fa9d0046 100644 --- a/go/vt/wrangler/schema.go +++ b/go/vt/wrangler/schema.go @@ -26,7 +26,6 @@ import ( "context" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" @@ -34,6 +33,7 @@ import ( "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vtctl/schematools" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -284,7 +284,7 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topo return fmt.Errorf("no primary in shard record %v/%v. Consider running 'vtctl InitShardPrimary' in case of a new shard or reparenting the shard to fix the topology data", destKeyspace, destShard) } - err = wr.copyShardMetadata(ctx, sourceTabletAlias, destShardInfo.PrimaryAlias) + err = schematools.CopyShardMetadata(ctx, wr.ts, wr.tmc, sourceTabletAlias, destShardInfo.PrimaryAlias) if err != nil { return fmt.Errorf("copyShardMetadata(%v, %v) failed: %v", sourceTabletAlias, destShardInfo.PrimaryAlias, err) } @@ -357,49 +357,6 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topo return err } -// copyShardMetadata copies contents of _vt.shard_metadata table from the source -// tablet to the destination tablet. It's assumed that destination tablet is a -// primary and binlogging is not turned off when INSERT statements are executed. -func (wr *Wrangler) copyShardMetadata(ctx context.Context, srcTabletAlias *topodatapb.TabletAlias, destTabletAlias *topodatapb.TabletAlias) error { - sql := "SELECT 1 FROM information_schema.tables WHERE table_schema = '_vt' AND table_name = 'shard_metadata'" - presenceResult, err := wr.ExecuteFetchAsDba(ctx, srcTabletAlias, sql, 1, false, false) - if err != nil { - return fmt.Errorf("ExecuteFetchAsDba(%v, %v, 1, false, false) failed: %v", srcTabletAlias, sql, err) - } - if len(presenceResult.Rows) == 0 { - log.Infof("_vt.shard_metadata doesn't exist on the source tablet %v, skipping its copy.", topoproto.TabletAliasString(srcTabletAlias)) - return nil - } - - // TODO: 100 may be too low here for row limit - sql = "SELECT db_name, name, value FROM _vt.shard_metadata" - dataProto, err := wr.ExecuteFetchAsDba(ctx, srcTabletAlias, sql, 100, false, false) - if err != nil { - return fmt.Errorf("ExecuteFetchAsDba(%v, %v, 100, false, false) failed: %v", srcTabletAlias, sql, err) - } - data := sqltypes.Proto3ToResult(dataProto) - for _, row := range data.Rows { - dbName := row[0] - name := row[1] - value := row[2] - queryBuf := bytes.Buffer{} - queryBuf.WriteString("INSERT INTO _vt.shard_metadata (db_name, name, value) VALUES (") - dbName.EncodeSQL(&queryBuf) - queryBuf.WriteByte(',') - name.EncodeSQL(&queryBuf) - queryBuf.WriteByte(',') - value.EncodeSQL(&queryBuf) - queryBuf.WriteString(") ON DUPLICATE KEY UPDATE value = ") - value.EncodeSQL(&queryBuf) - - _, err := wr.ExecuteFetchAsDba(ctx, destTabletAlias, queryBuf.String(), 0, false, false) - if err != nil { - return fmt.Errorf("ExecuteFetchAsDba(%v, %v, 0, false, false) failed: %v", destTabletAlias, queryBuf.String(), err) - } - } - return nil -} - // compareSchemas returns nil if the schema of the two tablets referenced by // "sourceAlias" and "destAlias" are identical. Otherwise, the difference is // returned as []string. From 1dae8ebd52f096582248f8373338f5ab95c8d274 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Wed, 27 Oct 2021 09:17:34 -0400 Subject: [PATCH 2/4] Factor out wrangler's `GetSchema` to schematools Signed-off-by: Andrew Mason --- go/vt/vtctl/grpcvtctldserver/server.go | 10 ++---- go/vt/vtctl/schematools/schematools.go | 45 ++++++++++++++++++++++++++ go/vt/worker/multi_split_diff.go | 9 +++--- go/vt/worker/split_clone.go | 3 +- go/vt/worker/split_diff.go | 9 +++--- go/vt/worker/vertical_split_diff.go | 9 +++--- go/vt/wrangler/materializer.go | 5 +-- go/vt/wrangler/schema.go | 24 ++++---------- go/vt/wrangler/vdiff.go | 3 +- 9 files changed, 76 insertions(+), 41 deletions(-) create mode 100644 go/vt/vtctl/schematools/schematools.go diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index e12f25e1d2c..1cfd2055f9f 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -846,21 +846,15 @@ func (s *VtctldServer) GetSchema(ctx context.Context, req *vtctldatapb.GetSchema defer span.Finish() span.Annotate("tablet_alias", topoproto.TabletAliasString(req.TabletAlias)) - - tablet, err := s.ts.GetTablet(ctx, req.TabletAlias) - if err != nil { - return nil, fmt.Errorf("GetTablet(%v) failed: %w", req.TabletAlias, err) - } - span.Annotate("tables", strings.Join(req.Tables, ",")) span.Annotate("exclude_tables", strings.Join(req.ExcludeTables, ",")) span.Annotate("include_views", req.IncludeViews) span.Annotate("table_names_only", req.TableNamesOnly) span.Annotate("table_sizes_only", req.TableSizesOnly) - sd, err := s.tmc.GetSchema(ctx, tablet.Tablet, req.Tables, req.ExcludeTables, req.IncludeViews) + sd, err := schematools.GetSchema(ctx, s.ts, s.tmc, req.TabletAlias, req.Tables, req.ExcludeTables, req.IncludeViews) if err != nil { - return nil, fmt.Errorf("GetSchema(%v, %v, %v, %v) failed: %w", tablet.Tablet, req.Tables, req.ExcludeTables, req.IncludeViews, err) + return nil, err } if req.TableNamesOnly { diff --git a/go/vt/vtctl/schematools/schematools.go b/go/vt/vtctl/schematools/schematools.go new file mode 100644 index 00000000000..41e6f4c5dc5 --- /dev/null +++ b/go/vt/vtctl/schematools/schematools.go @@ -0,0 +1,45 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schematools + +import ( + "context" + + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" +) + +// GetSchema makes an RPC to get the schema from a remote tablet, after +// verifying a tablet with that alias exists in the topo. +func GetSchema(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, alias *topodatapb.TabletAlias, tables []string, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { + ti, err := ts.GetTablet(ctx, alias) + if err != nil { + return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "GetTablet(%v) failed: %v", alias, err) + } + + sd, err := tmc.GetSchema(ctx, ti.Tablet, tables, excludeTables, includeViews) + if err != nil { + return nil, vterrors.Wrapf(err, "GetSchema(%v, %v, %v, %v) failed: %v", ti.Tablet, tables, excludeTables, includeViews, err) + } + + return sd, nil +} diff --git a/go/vt/worker/multi_split_diff.go b/go/vt/worker/multi_split_diff.go index d1749302788..06210870e73 100644 --- a/go/vt/worker/multi_split_diff.go +++ b/go/vt/worker/multi_split_diff.go @@ -26,6 +26,7 @@ import ( "context" "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/tabletconn" @@ -761,8 +762,8 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl go func(i int, destinationAlias *topodatapb.TabletAlias) { var err error shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - destinationSchemaDefinition, err := msdw.wr.GetSchema( - shortCtx, destinationAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) + destinationSchemaDefinition, err := schematools.GetSchema( + shortCtx, msdw.wr.TopoServer(), msdw.wr.TabletManagerClient(), destinationAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) cancel() if err != nil { msdw.markAsWillFail(rec, err) @@ -776,8 +777,8 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl go func() { var err error shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - sourceSchemaDefinition, err = msdw.wr.GetSchema( - shortCtx, msdw.sourceAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) + sourceSchemaDefinition, err = schematools.GetSchema( + shortCtx, msdw.wr.TopoServer(), msdw.wr.TabletManagerClient(), msdw.sourceAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */) cancel() if err != nil { msdw.markAsWillFail(rec, err) diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go index f5ba41a35c9..b3bb2d47cdd 100644 --- a/go/vt/worker/split_clone.go +++ b/go/vt/worker/split_clone.go @@ -35,6 +35,7 @@ import ( "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tabletconn" @@ -1320,7 +1321,7 @@ func (scw *SplitCloneWorker) getSourceSchema(ctx context.Context, tablet *topoda // in each source shard for each table to be about the same // (rowCount is used to estimate an ETA) shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - sourceSchemaDefinition, err := scw.wr.GetSchema(shortCtx, tablet.Alias, scw.tables, scw.excludeTables, false /* includeViews */) + sourceSchemaDefinition, err := schematools.GetSchema(shortCtx, scw.wr.TopoServer(), scw.wr.TabletManagerClient(), tablet.Alias, scw.tables, scw.excludeTables, false /* includeViews */) cancel() if err != nil { return nil, vterrors.Wrapf(err, "cannot get schema from source %v", topoproto.TabletAliasString(tablet.Alias)) diff --git a/go/vt/worker/split_diff.go b/go/vt/worker/split_diff.go index 14ce0377c7f..1a93dfb680d 100644 --- a/go/vt/worker/split_diff.go +++ b/go/vt/worker/split_diff.go @@ -22,6 +22,7 @@ import ( "sync" "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vterrors" "context" @@ -404,8 +405,8 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error { go func() { var err error shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - sdw.destinationSchemaDefinition, err = sdw.wr.GetSchema( - shortCtx, sdw.destinationAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */) + sdw.destinationSchemaDefinition, err = schematools.GetSchema( + shortCtx, sdw.wr.TopoServer(), sdw.wr.TabletManagerClient(), sdw.destinationAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */) cancel() if err != nil { sdw.markAsWillFail(rec, err) @@ -417,8 +418,8 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error { go func() { var err error shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - sdw.sourceSchemaDefinition, err = sdw.wr.GetSchema( - shortCtx, sdw.sourceAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */) + sdw.sourceSchemaDefinition, err = schematools.GetSchema( + shortCtx, sdw.wr.TopoServer(), sdw.wr.TabletManagerClient(), sdw.sourceAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */) cancel() if err != nil { sdw.markAsWillFail(rec, err) diff --git a/go/vt/worker/vertical_split_diff.go b/go/vt/worker/vertical_split_diff.go index b857ca55126..fc01c8c8e63 100644 --- a/go/vt/worker/vertical_split_diff.go +++ b/go/vt/worker/vertical_split_diff.go @@ -21,6 +21,7 @@ import ( "html/template" "sync" + "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vterrors" "context" @@ -366,8 +367,8 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error { go func() { var err error shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - vsdw.destinationSchemaDefinition, err = vsdw.wr.GetSchema( - shortCtx, vsdw.destinationAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */) + vsdw.destinationSchemaDefinition, err = schematools.GetSchema( + shortCtx, vsdw.wr.TopoServer(), vsdw.wr.TabletManagerClient(), vsdw.destinationAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */) cancel() if err != nil { vsdw.markAsWillFail(rec, err) @@ -379,8 +380,8 @@ func (vsdw *VerticalSplitDiffWorker) diff(ctx context.Context) error { go func() { var err error shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout) - vsdw.sourceSchemaDefinition, err = vsdw.wr.GetSchema( - shortCtx, vsdw.sourceAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */) + vsdw.sourceSchemaDefinition, err = schematools.GetSchema( + shortCtx, vsdw.wr.TopoServer(), vsdw.wr.TabletManagerClient(), vsdw.sourceAlias, vsdw.shardInfo.SourceShards[0].Tables, nil /* excludeTables */, false /* includeViews */) cancel() if err != nil { vsdw.markAsWillFail(rec, err) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index f30da0cd79d..13f99d9f877 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -32,6 +32,7 @@ import ( querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vtctl/workflow" "vitess.io/vitess/go/vt/vtgate/evalengine" @@ -525,7 +526,7 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp if onesource.PrimaryAlias == nil { return nil, nil, nil, fmt.Errorf("source shard has no primary: %v", onesource.ShardName()) } - tableSchema, err := wr.GetSchema(ctx, onesource.PrimaryAlias, []string{sourceTableName}, nil, false) + tableSchema, err := schematools.GetSchema(ctx, wr.ts, wr.tmc, onesource.PrimaryAlias, []string{sourceTableName}, nil, false) if err != nil { return nil, nil, nil, err } @@ -922,7 +923,7 @@ func (mz *materializer) deploySchema(ctx context.Context) error { allTables := []string{"/.*/"} hasTargetTable := map[string]bool{} - targetSchema, err := mz.wr.GetSchema(ctx, target.PrimaryAlias, allTables, nil, false) + targetSchema, err := schematools.GetSchema(ctx, mz.wr.ts, mz.wr.tmc, target.PrimaryAlias, allTables, nil, false) if err != nil { return err } diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go index 615fa9d0046..ce3580b811d 100644 --- a/go/vt/wrangler/schema.go +++ b/go/vt/wrangler/schema.go @@ -45,21 +45,11 @@ const ( DefaultWaitReplicasTimeout = 10 * time.Second ) -// GetSchema uses an RPC to get the schema from a remote tablet -func (wr *Wrangler) GetSchema(ctx context.Context, tabletAlias *topodatapb.TabletAlias, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) { - ti, err := wr.ts.GetTablet(ctx, tabletAlias) - if err != nil { - return nil, fmt.Errorf("GetTablet(%v) failed: %v", tabletAlias, err) - } - - return wr.tmc.GetSchema(ctx, ti.Tablet, tables, excludeTables, includeViews) -} - // helper method to asynchronously diff a schema func (wr *Wrangler) diffSchema(ctx context.Context, primarySchema *tabletmanagerdatapb.SchemaDefinition, primaryTabletAlias, alias *topodatapb.TabletAlias, excludeTables []string, includeViews bool, wg *sync.WaitGroup, er concurrency.ErrorRecorder) { defer wg.Done() log.Infof("Gathering schema for %v", topoproto.TabletAliasString(alias)) - replicaSchema, err := wr.GetSchema(ctx, alias, nil, excludeTables, includeViews) + replicaSchema, err := schematools.GetSchema(ctx, wr.ts, wr.tmc, alias, nil, excludeTables, includeViews) if err != nil { er.RecordError(fmt.Errorf("GetSchema(%v, nil, %v, %v) failed: %v", alias, excludeTables, includeViews, err)) return @@ -81,7 +71,7 @@ func (wr *Wrangler) ValidateSchemaShard(ctx context.Context, keyspace, shard str return fmt.Errorf("no primary in shard %v/%v", keyspace, shard) } log.Infof("Gathering schema for primary %v", topoproto.TabletAliasString(si.PrimaryAlias)) - primarySchema, err := wr.GetSchema(ctx, si.PrimaryAlias, nil, excludeTables, includeViews) + primarySchema, err := schematools.GetSchema(ctx, wr.ts, wr.tmc, si.PrimaryAlias, nil, excludeTables, includeViews) if err != nil { return fmt.Errorf("GetSchema(%v, nil, %v, %v) failed: %v", si.PrimaryAlias, excludeTables, includeViews, err) } @@ -170,7 +160,7 @@ func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, if referenceSchema == nil { referenceAlias = si.PrimaryAlias log.Infof("Gathering schema for reference primary %v", topoproto.TabletAliasString(referenceAlias)) - referenceSchema, err = wr.GetSchema(ctx, referenceAlias, nil, excludeTables, includeViews) + referenceSchema, err = schematools.GetSchema(ctx, wr.ts, wr.tmc, referenceAlias, nil, excludeTables, includeViews) if err != nil { return fmt.Errorf("GetSchema(%v, nil, %v, %v) failed: %v", referenceAlias, excludeTables, includeViews, err) } @@ -218,7 +208,7 @@ func (wr *Wrangler) ValidateVSchema(ctx context.Context, keyspace string, shards shardFailures.RecordError(fmt.Errorf("GetShard(%v, %v) failed: %v", keyspace, shard, err)) return } - primarySchema, err := wr.GetSchema(ctx, si.PrimaryAlias, nil, excludeTables, includeViews) + primarySchema, err := schematools.GetSchema(ctx, wr.ts, wr.tmc, si.PrimaryAlias, nil, excludeTables, includeViews) if err != nil { shardFailures.RecordError(fmt.Errorf("GetSchema(%s, nil, %v, %v) (%v/%v) failed: %v", si.PrimaryAlias.String(), excludeTables, includeViews, keyspace, shard, err, @@ -298,7 +288,7 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topo return nil } - sourceSd, err := wr.GetSchema(ctx, sourceTabletAlias, tables, excludeTables, includeViews) + sourceSd, err := schematools.GetSchema(ctx, wr.ts, wr.tmc, sourceTabletAlias, tables, excludeTables, includeViews) if err != nil { return fmt.Errorf("GetSchema(%v, %v, %v, %v) failed: %v", sourceTabletAlias, tables, excludeTables, includeViews, err) } @@ -361,11 +351,11 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topo // "sourceAlias" and "destAlias" are identical. Otherwise, the difference is // returned as []string. func (wr *Wrangler) compareSchemas(ctx context.Context, sourceAlias, destAlias *topodatapb.TabletAlias, tables, excludeTables []string, includeViews bool) ([]string, error) { - sourceSd, err := wr.GetSchema(ctx, sourceAlias, tables, excludeTables, includeViews) + sourceSd, err := schematools.GetSchema(ctx, wr.ts, wr.tmc, sourceAlias, tables, excludeTables, includeViews) if err != nil { return nil, fmt.Errorf("failed to get schema from tablet %v. err: %v", sourceAlias, err) } - destSd, err := wr.GetSchema(ctx, destAlias, tables, excludeTables, includeViews) + destSd, err := schematools.GetSchema(ctx, wr.ts, wr.tmc, destAlias, tables, excludeTables, includeViews) if err != nil { return nil, fmt.Errorf("failed to get schema from tablet %v. err: %v", destAlias, err) } diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index fa7f2147744..d7819dc8322 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -41,6 +41,7 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vtctl/workflow" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" @@ -216,7 +217,7 @@ func (wr *Wrangler) VDiff(ctx context.Context, targetKeyspace, workflowName, sou oneFilter = bls.Filter break } - schm, err := wr.GetSchema(ctx, oneTarget.GetPrimary().Alias, nil, nil, false) + schm, err := schematools.GetSchema(ctx, wr.ts, wr.tmc, oneTarget.GetPrimary().Alias, nil, nil, false) if err != nil { return nil, vterrors.Wrap(err, "GetSchema") } From 1281c65f7fa46bbdf8f0b262db608a6cbb6287c3 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Wed, 27 Oct 2021 09:21:50 -0400 Subject: [PATCH 3/4] Tidying up the imports, everywhere I go Signed-off-by: Andrew Mason --- go/vt/worker/multi_split_diff.go | 20 +++++++++----------- go/vt/worker/split_clone.go | 5 ++--- go/vt/worker/split_diff.go | 10 ++++------ go/vt/worker/vertical_split_diff.go | 8 +++----- go/vt/wrangler/materializer.go | 25 ++++++++++++------------- go/vt/wrangler/schema.go | 3 +-- 6 files changed, 31 insertions(+), 40 deletions(-) diff --git a/go/vt/worker/multi_split_diff.go b/go/vt/worker/multi_split_diff.go index 06210870e73..cf6a6cfb939 100644 --- a/go/vt/worker/multi_split_diff.go +++ b/go/vt/worker/multi_split_diff.go @@ -17,30 +17,28 @@ limitations under the License. package worker import ( + "context" "fmt" "html/template" "sort" "sync" "time" - "context" - - "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vtctl/schematools" - "vitess.io/vitess/go/vt/vttablet/queryservice" - "vitess.io/vitess/go/vt/vttablet/tabletconn" - + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vtctl/schematools" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/vindexes" + "vitess.io/vitess/go/vt/vttablet/queryservice" + "vitess.io/vitess/go/vt/vttablet/tabletconn" "vitess.io/vitess/go/vt/wrangler" - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/binlog/binlogplayer" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vtgate/vindexes" + "vitess.io/vitess/go/vt/proto/vtrpc" ) // Scanners encapsulates a source and a destination. We create one of these per parallel runner. diff --git a/go/vt/worker/split_clone.go b/go/vt/worker/split_clone.go index b3bb2d47cdd..dd658e6e9e8 100644 --- a/go/vt/worker/split_clone.go +++ b/go/vt/worker/split_clone.go @@ -17,20 +17,18 @@ limitations under the License. package worker import ( + "context" "fmt" "html/template" "strings" "sync" "time" - "context" - "vitess.io/vitess/go/event" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/discovery" - "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/throttler" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -45,6 +43,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" ) // cloneType specifies whether it is a horizontal resharding or a vertical split. diff --git a/go/vt/worker/split_diff.go b/go/vt/worker/split_diff.go index 1a93dfb680d..47d7cd756d6 100644 --- a/go/vt/worker/split_diff.go +++ b/go/vt/worker/split_diff.go @@ -17,16 +17,11 @@ limitations under the License. package worker import ( + "context" "html/template" "sort" "sync" - "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/vtctl/schematools" - "vitess.io/vitess/go/vt/vterrors" - - "context" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -34,11 +29,14 @@ import ( "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vtctl/schematools" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/wrangler" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vtrpc" ) // SplitDiffWorker executes a diff between a destination shard and its diff --git a/go/vt/worker/vertical_split_diff.go b/go/vt/worker/vertical_split_diff.go index fc01c8c8e63..167acfaadaf 100644 --- a/go/vt/worker/vertical_split_diff.go +++ b/go/vt/worker/vertical_split_diff.go @@ -17,15 +17,11 @@ limitations under the License. package worker import ( + "context" "fmt" "html/template" "sync" - "vitess.io/vitess/go/vt/vtctl/schematools" - "vitess.io/vitess/go/vt/vterrors" - - "context" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -33,6 +29,8 @@ import ( "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vtctl/schematools" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/wrangler" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 13f99d9f877..6fefdafea45 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -17,6 +17,7 @@ limitations under the License. package wrangler import ( + "context" "fmt" "hash/fnv" "math" @@ -27,30 +28,28 @@ import ( "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/json2" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/concurrency" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl/tmutils" - querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/schema" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtctl/schematools" "vitess.io/vitess/go/vt/vtctl/workflow" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/vtgate/vindexes" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" - "context" - - "vitess.io/vitess/go/json2" - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/binlog/binlogplayer" - "vitess.io/vitess/go/vt/concurrency" - "vitess.io/vitess/go/vt/key" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" - "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vtgate/vindexes" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" ) type materializer struct { diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go index ce3580b811d..a3ff5e86925 100644 --- a/go/vt/wrangler/schema.go +++ b/go/vt/wrangler/schema.go @@ -18,14 +18,13 @@ package wrangler import ( "bytes" + "context" "fmt" "html/template" "sort" "sync" "time" - "context" - "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" From 5e19f89c1504308ac9dd7f207b53e71e3abe54e0 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Sat, 30 Oct 2021 09:12:16 -0400 Subject: [PATCH 4/4] Factor out `compareSchemas` to schematools Signed-off-by: Andrew Mason --- go/vt/vtctl/schematools/diff.go | 55 +++++++++++++++++++++++++++++++++ go/vt/wrangler/schema.go | 19 ++---------- 2 files changed, 57 insertions(+), 17 deletions(-) create mode 100644 go/vt/vtctl/schematools/diff.go diff --git a/go/vt/vtctl/schematools/diff.go b/go/vt/vtctl/schematools/diff.go new file mode 100644 index 00000000000..2d3c8bf018e --- /dev/null +++ b/go/vt/vtctl/schematools/diff.go @@ -0,0 +1,55 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package schematools + +import ( + "context" + "fmt" + + "vitess.io/vitess/go/vt/mysqlctl/tmutils" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +// CompareSchemas returns (nil, nil) if the schema of the two tablets match. If +// there are diffs, they are returned as (diffs []string, nil). +// +// If fetching the schema for either tablet fails, a non-nil error is returned. +func CompareSchemas( + ctx context.Context, + ts *topo.Server, + tmc tmclient.TabletManagerClient, + source *topodatapb.TabletAlias, + dest *topodatapb.TabletAlias, + tables []string, + excludeTables []string, + includeViews bool, +) (diffs []string, err error) { + sourceSchema, err := GetSchema(ctx, ts, tmc, source, tables, excludeTables, includeViews) + if err != nil { + return nil, fmt.Errorf("failed to get schema from tablet %v. err: %v", source, err) + } + + destSchema, err := GetSchema(ctx, ts, tmc, dest, tables, excludeTables, includeViews) + if err != nil { + return nil, fmt.Errorf("failed to get schema from tablet %v. err: %v", dest, err) + } + + return tmutils.DiffSchemaToArray("source", sourceSchema, "dest", destSchema), nil +} diff --git a/go/vt/wrangler/schema.go b/go/vt/wrangler/schema.go index a3ff5e86925..4a77738f154 100644 --- a/go/vt/wrangler/schema.go +++ b/go/vt/wrangler/schema.go @@ -278,7 +278,7 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topo return fmt.Errorf("copyShardMetadata(%v, %v) failed: %v", sourceTabletAlias, destShardInfo.PrimaryAlias, err) } - diffs, err := wr.compareSchemas(ctx, sourceTabletAlias, destShardInfo.PrimaryAlias, tables, excludeTables, includeViews) + diffs, err := schematools.CompareSchemas(ctx, wr.ts, wr.tmc, sourceTabletAlias, destShardInfo.PrimaryAlias, tables, excludeTables, includeViews) if err != nil { return fmt.Errorf("CopySchemaShard failed because schemas could not be compared initially: %v", err) } @@ -319,7 +319,7 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topo // statement. We want to fail early in this case because vtworker SplitDiff // fails in case of such an inconsistency as well. if !skipVerify { - diffs, err = wr.compareSchemas(ctx, sourceTabletAlias, destShardInfo.PrimaryAlias, tables, excludeTables, includeViews) + diffs, err = schematools.CompareSchemas(ctx, wr.ts, wr.tmc, sourceTabletAlias, destShardInfo.PrimaryAlias, tables, excludeTables, includeViews) if err != nil { return fmt.Errorf("CopySchemaShard failed because schemas could not be compared finally: %v", err) } @@ -346,21 +346,6 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias *topo return err } -// compareSchemas returns nil if the schema of the two tablets referenced by -// "sourceAlias" and "destAlias" are identical. Otherwise, the difference is -// returned as []string. -func (wr *Wrangler) compareSchemas(ctx context.Context, sourceAlias, destAlias *topodatapb.TabletAlias, tables, excludeTables []string, includeViews bool) ([]string, error) { - sourceSd, err := schematools.GetSchema(ctx, wr.ts, wr.tmc, sourceAlias, tables, excludeTables, includeViews) - if err != nil { - return nil, fmt.Errorf("failed to get schema from tablet %v. err: %v", sourceAlias, err) - } - destSd, err := schematools.GetSchema(ctx, wr.ts, wr.tmc, destAlias, tables, excludeTables, includeViews) - if err != nil { - return nil, fmt.Errorf("failed to get schema from tablet %v. err: %v", destAlias, err) - } - return tmutils.DiffSchemaToArray("source", sourceSd, "dest", destSd), nil -} - // applySQLShard applies a given SQL change on a given tablet alias. It allows executing arbitrary // SQL statements, but doesn't return any results, so it's only useful for SQL statements // that would be run for their effects (e.g., CREATE).