Skip to content

Commit

Permalink
Merge pull request #9123 from ajm188/more-schematools
Browse files Browse the repository at this point in the history
[schematools] Move more functions out of `wrangler` to package importable by `grpcvtctldserver`
  • Loading branch information
ajm188 authored Nov 4, 2021
2 parents 40b479a + 5e19f89 commit 8bda378
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 138 deletions.
10 changes: 2 additions & 8 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,21 +882,15 @@ func (s *VtctldServer) GetSchema(ctx context.Context, req *vtctldatapb.GetSchema
defer span.Finish()

span.Annotate("tablet_alias", topoproto.TabletAliasString(req.TabletAlias))

tablet, err := s.ts.GetTablet(ctx, req.TabletAlias)
if err != nil {
return nil, fmt.Errorf("GetTablet(%v) failed: %w", req.TabletAlias, err)
}

span.Annotate("tables", strings.Join(req.Tables, ","))
span.Annotate("exclude_tables", strings.Join(req.ExcludeTables, ","))
span.Annotate("include_views", req.IncludeViews)
span.Annotate("table_names_only", req.TableNamesOnly)
span.Annotate("table_sizes_only", req.TableSizesOnly)

sd, err := s.tmc.GetSchema(ctx, tablet.Tablet, req.Tables, req.ExcludeTables, req.IncludeViews)
sd, err := schematools.GetSchema(ctx, s.ts, s.tmc, req.TabletAlias, req.Tables, req.ExcludeTables, req.IncludeViews)
if err != nil {
return nil, fmt.Errorf("GetSchema(%v, %v, %v, %v) failed: %w", tablet.Tablet, req.Tables, req.ExcludeTables, req.IncludeViews, err)
return nil, err
}

if req.TableNamesOnly {
Expand Down
91 changes: 91 additions & 0 deletions go/vt/vtctl/schematools/copy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package schematools

import (
"bytes"
"context"
"fmt"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vttablet/tmclient"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// CopyShardMetadata copies the contents of the _vt.shard_metadata table from
// the source tablet to the destination tablet.
//
// NOTE: This function assumes that the destination tablet is a primary with
// binary logging enabled, in order to propagate the INSERT statements to any
// replicas in the destination shard.
func CopyShardMetadata(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, source *topodatapb.TabletAlias, dest *topodatapb.TabletAlias) error {
sourceTablet, err := ts.GetTablet(ctx, source)
if err != nil {
return fmt.Errorf("GetTablet(%v) failed: %w", topoproto.TabletAliasString(source), err)
}

destTablet, err := ts.GetTablet(ctx, dest)
if err != nil {
return fmt.Errorf("GetTablet(%v) failed: %w", topoproto.TabletAliasString(dest), err)
}

sql := "SELECT 1 FROM information_schema.tables WHERE table_schema = '_vt' AND table_name = 'shard_metadata'"
presenceResult, err := tmc.ExecuteFetchAsDba(ctx, sourceTablet.Tablet, false, []byte(sql), 1, false, false)
if err != nil {
return fmt.Errorf("ExecuteFetchAsDba(%v, false, %v, 1, false, false) failed: %v", topoproto.TabletAliasString(source), sql, err)
}
if len(presenceResult.Rows) == 0 {
log.Infof("_vt.shard_metadata doesn't exist on the source tablet %v, skipping its copy.", topoproto.TabletAliasString(source))
return nil
}

// (TODO|@ajm188,@deepthi): 100 may be too low here for row limit
sql = "SELECT db_name, name, value FROM _vt.shard_metadata"
p3qr, err := tmc.ExecuteFetchAsDba(ctx, sourceTablet.Tablet, false, []byte(sql), 100, false, false)
if err != nil {
return fmt.Errorf("ExecuteFetchAsDba(%v, false, %v, 100, false, false) failed: %v", topoproto.TabletAliasString(source), sql, err)
}

qr := sqltypes.Proto3ToResult(p3qr)
queryBuf := bytes.NewBuffer(nil)
for _, row := range qr.Rows {
dbName := row[0]
name := row[1]
value := row[2]
queryBuf.WriteString("INSERT INTO _vt.shard_metadata (db_name, name, value) VALUES (")
dbName.EncodeSQL(queryBuf)
queryBuf.WriteByte(',')
name.EncodeSQL(queryBuf)
queryBuf.WriteByte(',')
value.EncodeSQL(queryBuf)
queryBuf.WriteString(") ON DUPLICATE KEY UPDATE value = ")
value.EncodeSQL(queryBuf)

_, err := tmc.ExecuteFetchAsDba(ctx, destTablet.Tablet, false, queryBuf.Bytes(), 0, false, false)
if err != nil {
return fmt.Errorf("ExecuteFetchAsDba(%v, false, %v, 0, false, false) failed: %v", topoproto.TabletAliasString(dest), queryBuf.String(), err)
}

queryBuf.Reset()
}

return nil
}
55 changes: 55 additions & 0 deletions go/vt/vtctl/schematools/diff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package schematools

import (
"context"
"fmt"

"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tmclient"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// CompareSchemas returns (nil, nil) if the schema of the two tablets match. If
// there are diffs, they are returned as (diffs []string, nil).
//
// If fetching the schema for either tablet fails, a non-nil error is returned.
func CompareSchemas(
ctx context.Context,
ts *topo.Server,
tmc tmclient.TabletManagerClient,
source *topodatapb.TabletAlias,
dest *topodatapb.TabletAlias,
tables []string,
excludeTables []string,
includeViews bool,
) (diffs []string, err error) {
sourceSchema, err := GetSchema(ctx, ts, tmc, source, tables, excludeTables, includeViews)
if err != nil {
return nil, fmt.Errorf("failed to get schema from tablet %v. err: %v", source, err)
}

destSchema, err := GetSchema(ctx, ts, tmc, dest, tables, excludeTables, includeViews)
if err != nil {
return nil, fmt.Errorf("failed to get schema from tablet %v. err: %v", dest, err)
}

return tmutils.DiffSchemaToArray("source", sourceSchema, "dest", destSchema), nil
}
45 changes: 45 additions & 0 deletions go/vt/vtctl/schematools/schematools.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package schematools

import (
"context"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"

tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
)

// GetSchema makes an RPC to get the schema from a remote tablet, after
// verifying a tablet with that alias exists in the topo.
func GetSchema(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, alias *topodatapb.TabletAlias, tables []string, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) {
ti, err := ts.GetTablet(ctx, alias)
if err != nil {
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "GetTablet(%v) failed: %v", alias, err)
}

sd, err := tmc.GetSchema(ctx, ti.Tablet, tables, excludeTables, includeViews)
if err != nil {
return nil, vterrors.Wrapf(err, "GetSchema(%v, %v, %v, %v) failed: %v", ti.Tablet, tables, excludeTables, includeViews, err)
}

return sd, nil
}
27 changes: 13 additions & 14 deletions go/vt/worker/multi_split_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,28 @@ limitations under the License.
package worker

import (
"context"
"fmt"
"html/template"
"sort"
"sync"
"time"

"context"

"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/schematools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"
"vitess.io/vitess/go/vt/wrangler"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/proto/vtrpc"
)

// Scanners encapsulates a source and a destination. We create one of these per parallel runner.
Expand Down Expand Up @@ -761,8 +760,8 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl
go func(i int, destinationAlias *topodatapb.TabletAlias) {
var err error
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
destinationSchemaDefinition, err := msdw.wr.GetSchema(
shortCtx, destinationAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */)
destinationSchemaDefinition, err := schematools.GetSchema(
shortCtx, msdw.wr.TopoServer(), msdw.wr.TabletManagerClient(), destinationAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */)
cancel()
if err != nil {
msdw.markAsWillFail(rec, err)
Expand All @@ -776,8 +775,8 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl
go func() {
var err error
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
sourceSchemaDefinition, err = msdw.wr.GetSchema(
shortCtx, msdw.sourceAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */)
sourceSchemaDefinition, err = schematools.GetSchema(
shortCtx, msdw.wr.TopoServer(), msdw.wr.TabletManagerClient(), msdw.sourceAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */)
cancel()
if err != nil {
msdw.markAsWillFail(rec, err)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/worker/split_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,23 @@ limitations under the License.
package worker

import (
"context"
"fmt"
"html/template"
"strings"
"sync"
"time"

"context"

"vitess.io/vitess/go/event"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/schematools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/tabletconn"
Expand All @@ -44,6 +43,7 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
)

// cloneType specifies whether it is a horizontal resharding or a vertical split.
Expand Down Expand Up @@ -1320,7 +1320,7 @@ func (scw *SplitCloneWorker) getSourceSchema(ctx context.Context, tablet *topoda
// in each source shard for each table to be about the same
// (rowCount is used to estimate an ETA)
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
sourceSchemaDefinition, err := scw.wr.GetSchema(shortCtx, tablet.Alias, scw.tables, scw.excludeTables, false /* includeViews */)
sourceSchemaDefinition, err := schematools.GetSchema(shortCtx, scw.wr.TopoServer(), scw.wr.TabletManagerClient(), tablet.Alias, scw.tables, scw.excludeTables, false /* includeViews */)
cancel()
if err != nil {
return nil, vterrors.Wrapf(err, "cannot get schema from source %v", topoproto.TabletAliasString(tablet.Alias))
Expand Down
17 changes: 8 additions & 9 deletions go/vt/worker/split_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,26 @@ limitations under the License.
package worker

import (
"context"
"html/template"
"sort"
"sync"

"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

"context"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/schematools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/wrangler"

tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
)

// SplitDiffWorker executes a diff between a destination shard and its
Expand Down Expand Up @@ -404,8 +403,8 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {
go func() {
var err error
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
sdw.destinationSchemaDefinition, err = sdw.wr.GetSchema(
shortCtx, sdw.destinationAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */)
sdw.destinationSchemaDefinition, err = schematools.GetSchema(
shortCtx, sdw.wr.TopoServer(), sdw.wr.TabletManagerClient(), sdw.destinationAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */)
cancel()
if err != nil {
sdw.markAsWillFail(rec, err)
Expand All @@ -417,8 +416,8 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {
go func() {
var err error
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
sdw.sourceSchemaDefinition, err = sdw.wr.GetSchema(
shortCtx, sdw.sourceAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */)
sdw.sourceSchemaDefinition, err = schematools.GetSchema(
shortCtx, sdw.wr.TopoServer(), sdw.wr.TabletManagerClient(), sdw.sourceAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */)
cancel()
if err != nil {
sdw.markAsWillFail(rec, err)
Expand Down
Loading

0 comments on commit 8bda378

Please sign in to comment.