Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[schematools] Move more functions out of wrangler to package importable by grpcvtctldserver #9123

Merged
merged 4 commits into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,21 +846,15 @@ func (s *VtctldServer) GetSchema(ctx context.Context, req *vtctldatapb.GetSchema
defer span.Finish()

span.Annotate("tablet_alias", topoproto.TabletAliasString(req.TabletAlias))

tablet, err := s.ts.GetTablet(ctx, req.TabletAlias)
if err != nil {
return nil, fmt.Errorf("GetTablet(%v) failed: %w", req.TabletAlias, err)
}

span.Annotate("tables", strings.Join(req.Tables, ","))
span.Annotate("exclude_tables", strings.Join(req.ExcludeTables, ","))
span.Annotate("include_views", req.IncludeViews)
span.Annotate("table_names_only", req.TableNamesOnly)
span.Annotate("table_sizes_only", req.TableSizesOnly)

sd, err := s.tmc.GetSchema(ctx, tablet.Tablet, req.Tables, req.ExcludeTables, req.IncludeViews)
sd, err := schematools.GetSchema(ctx, s.ts, s.tmc, req.TabletAlias, req.Tables, req.ExcludeTables, req.IncludeViews)
if err != nil {
return nil, fmt.Errorf("GetSchema(%v, %v, %v, %v) failed: %w", tablet.Tablet, req.Tables, req.ExcludeTables, req.IncludeViews, err)
return nil, err
}

if req.TableNamesOnly {
Expand Down
91 changes: 91 additions & 0 deletions go/vt/vtctl/schematools/copy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package schematools

import (
"bytes"
"context"
"fmt"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vttablet/tmclient"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// CopyShardMetadata copies the contents of the _vt.shard_metadata table from
// the source tablet to the destination tablet.
//
// NOTE: This function assumes that the destination tablet is a primary with
// binary logging enabled, in order to propagate the INSERT statements to any
// replicas in the destination shard.
func CopyShardMetadata(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, source *topodatapb.TabletAlias, dest *topodatapb.TabletAlias) error {
sourceTablet, err := ts.GetTablet(ctx, source)
if err != nil {
return fmt.Errorf("GetTablet(%v) failed: %w", topoproto.TabletAliasString(source), err)
}

destTablet, err := ts.GetTablet(ctx, dest)
if err != nil {
return fmt.Errorf("GetTablet(%v) failed: %w", topoproto.TabletAliasString(dest), err)
}

sql := "SELECT 1 FROM information_schema.tables WHERE table_schema = '_vt' AND table_name = 'shard_metadata'"
presenceResult, err := tmc.ExecuteFetchAsDba(ctx, sourceTablet.Tablet, false, []byte(sql), 1, false, false)
if err != nil {
return fmt.Errorf("ExecuteFetchAsDba(%v, false, %v, 1, false, false) failed: %v", topoproto.TabletAliasString(source), sql, err)
}
if len(presenceResult.Rows) == 0 {
log.Infof("_vt.shard_metadata doesn't exist on the source tablet %v, skipping its copy.", topoproto.TabletAliasString(source))
return nil
}

// (TODO|@ajm188,@deepthi): 100 may be too low here for row limit
sql = "SELECT db_name, name, value FROM _vt.shard_metadata"
p3qr, err := tmc.ExecuteFetchAsDba(ctx, sourceTablet.Tablet, false, []byte(sql), 100, false, false)
if err != nil {
return fmt.Errorf("ExecuteFetchAsDba(%v, false, %v, 100, false, false) failed: %v", topoproto.TabletAliasString(source), sql, err)
}

qr := sqltypes.Proto3ToResult(p3qr)
queryBuf := bytes.NewBuffer(nil)
for _, row := range qr.Rows {
dbName := row[0]
name := row[1]
value := row[2]
queryBuf.WriteString("INSERT INTO _vt.shard_metadata (db_name, name, value) VALUES (")
dbName.EncodeSQL(queryBuf)
queryBuf.WriteByte(',')
name.EncodeSQL(queryBuf)
queryBuf.WriteByte(',')
value.EncodeSQL(queryBuf)
queryBuf.WriteString(") ON DUPLICATE KEY UPDATE value = ")
value.EncodeSQL(queryBuf)

_, err := tmc.ExecuteFetchAsDba(ctx, destTablet.Tablet, false, queryBuf.Bytes(), 0, false, false)
if err != nil {
return fmt.Errorf("ExecuteFetchAsDba(%v, false, %v, 0, false, false) failed: %v", topoproto.TabletAliasString(dest), queryBuf.String(), err)
}

queryBuf.Reset()
}

return nil
}
55 changes: 55 additions & 0 deletions go/vt/vtctl/schematools/diff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package schematools

import (
"context"
"fmt"

"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tmclient"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// CompareSchemas returns (nil, nil) if the schema of the two tablets match. If
// there are diffs, they are returned as (diffs []string, nil).
//
// If fetching the schema for either tablet fails, a non-nil error is returned.
func CompareSchemas(
ctx context.Context,
ts *topo.Server,
tmc tmclient.TabletManagerClient,
source *topodatapb.TabletAlias,
dest *topodatapb.TabletAlias,
tables []string,
excludeTables []string,
includeViews bool,
) (diffs []string, err error) {
sourceSchema, err := GetSchema(ctx, ts, tmc, source, tables, excludeTables, includeViews)
if err != nil {
return nil, fmt.Errorf("failed to get schema from tablet %v. err: %v", source, err)
}

destSchema, err := GetSchema(ctx, ts, tmc, dest, tables, excludeTables, includeViews)
if err != nil {
return nil, fmt.Errorf("failed to get schema from tablet %v. err: %v", dest, err)
}

return tmutils.DiffSchemaToArray("source", sourceSchema, "dest", destSchema), nil
}
45 changes: 45 additions & 0 deletions go/vt/vtctl/schematools/schematools.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package schematools

import (
"context"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"

tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
)

// GetSchema makes an RPC to get the schema from a remote tablet, after
// verifying a tablet with that alias exists in the topo.
func GetSchema(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, alias *topodatapb.TabletAlias, tables []string, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) {
ti, err := ts.GetTablet(ctx, alias)
if err != nil {
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "GetTablet(%v) failed: %v", alias, err)
}

sd, err := tmc.GetSchema(ctx, ti.Tablet, tables, excludeTables, includeViews)
if err != nil {
return nil, vterrors.Wrapf(err, "GetSchema(%v, %v, %v, %v) failed: %v", ti.Tablet, tables, excludeTables, includeViews, err)
}

return sd, nil
}
27 changes: 13 additions & 14 deletions go/vt/worker/multi_split_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,28 @@ limitations under the License.
package worker

import (
"context"
"fmt"
"html/template"
"sort"
"sync"
"time"

"context"

"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/schematools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"
"vitess.io/vitess/go/vt/wrangler"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/proto/vtrpc"
)

// Scanners encapsulates a source and a destination. We create one of these per parallel runner.
Expand Down Expand Up @@ -761,8 +760,8 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl
go func(i int, destinationAlias *topodatapb.TabletAlias) {
var err error
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
destinationSchemaDefinition, err := msdw.wr.GetSchema(
shortCtx, destinationAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */)
destinationSchemaDefinition, err := schematools.GetSchema(
shortCtx, msdw.wr.TopoServer(), msdw.wr.TabletManagerClient(), destinationAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */)
cancel()
if err != nil {
msdw.markAsWillFail(rec, err)
Expand All @@ -776,8 +775,8 @@ func (msdw *MultiSplitDiffWorker) gatherSchemaInfo(ctx context.Context) ([]*tabl
go func() {
var err error
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
sourceSchemaDefinition, err = msdw.wr.GetSchema(
shortCtx, msdw.sourceAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */)
sourceSchemaDefinition, err = schematools.GetSchema(
shortCtx, msdw.wr.TopoServer(), msdw.wr.TabletManagerClient(), msdw.sourceAlias, nil /* tables */, msdw.excludeTables, false /* includeViews */)
cancel()
if err != nil {
msdw.markAsWillFail(rec, err)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/worker/split_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,23 @@ limitations under the License.
package worker

import (
"context"
"fmt"
"html/template"
"strings"
"sync"
"time"

"context"

"vitess.io/vitess/go/event"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/schematools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/tabletconn"
Expand All @@ -44,6 +43,7 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
)

// cloneType specifies whether it is a horizontal resharding or a vertical split.
Expand Down Expand Up @@ -1320,7 +1320,7 @@ func (scw *SplitCloneWorker) getSourceSchema(ctx context.Context, tablet *topoda
// in each source shard for each table to be about the same
// (rowCount is used to estimate an ETA)
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
sourceSchemaDefinition, err := scw.wr.GetSchema(shortCtx, tablet.Alias, scw.tables, scw.excludeTables, false /* includeViews */)
sourceSchemaDefinition, err := schematools.GetSchema(shortCtx, scw.wr.TopoServer(), scw.wr.TabletManagerClient(), tablet.Alias, scw.tables, scw.excludeTables, false /* includeViews */)
cancel()
if err != nil {
return nil, vterrors.Wrapf(err, "cannot get schema from source %v", topoproto.TabletAliasString(tablet.Alias))
Expand Down
17 changes: 8 additions & 9 deletions go/vt/worker/split_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,26 @@ limitations under the License.
package worker

import (
"context"
"html/template"
"sort"
"sync"

"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

"context"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/schematools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/wrangler"

tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
)

// SplitDiffWorker executes a diff between a destination shard and its
Expand Down Expand Up @@ -404,8 +403,8 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {
go func() {
var err error
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
sdw.destinationSchemaDefinition, err = sdw.wr.GetSchema(
shortCtx, sdw.destinationAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */)
sdw.destinationSchemaDefinition, err = schematools.GetSchema(
shortCtx, sdw.wr.TopoServer(), sdw.wr.TabletManagerClient(), sdw.destinationAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */)
cancel()
if err != nil {
sdw.markAsWillFail(rec, err)
Expand All @@ -417,8 +416,8 @@ func (sdw *SplitDiffWorker) diff(ctx context.Context) error {
go func() {
var err error
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
sdw.sourceSchemaDefinition, err = sdw.wr.GetSchema(
shortCtx, sdw.sourceAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */)
sdw.sourceSchemaDefinition, err = schematools.GetSchema(
shortCtx, sdw.wr.TopoServer(), sdw.wr.TabletManagerClient(), sdw.sourceAlias, nil /* tables */, sdw.excludeTables, false /* includeViews */)
cancel()
if err != nil {
sdw.markAsWillFail(rec, err)
Expand Down
Loading