Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topo to topo compare #4392

Merged
merged 5 commits into from
Dec 16, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions go/cmd/topo2topo/topo2topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package main

import (
"flag"
"fmt"
"os"

"golang.org/x/net/context"
"vitess.io/vitess/go/exit"
Expand All @@ -36,6 +38,7 @@ var (
toServerAddress = flag.String("to_server", "", "topology server address to copy data to")
toRoot = flag.String("to_root", "", "topology server root to copy data to")

compare = flag.Bool("compare", false, "compares data between topologies")
doKeyspaces = flag.Bool("do-keyspaces", false, "copies the keyspace information")
doShards = flag.Bool("do-shards", false, "copies the shard information")
doShardReplications = flag.Bool("do-shard-replications", false, "copies the shard replication information")
Expand Down Expand Up @@ -64,6 +67,14 @@ func main() {

ctx := context.Background()

if *compare {
compareTopos(ctx, fromTS, toTS)
return
}
copy(ctx, fromTS, toTS)
}

func copy(ctx context.Context, fromTS, toTS *topo.Server) {
rafael marked this conversation as resolved.
Show resolved Hide resolved
if *doKeyspaces {
helpers.CopyKeyspaces(ctx, fromTS, toTS)
}
Expand All @@ -76,4 +87,37 @@ func main() {
if *doTablets {
helpers.CopyTablets(ctx, fromTS, toTS)
}

}

func compareTopos(ctx context.Context, fromTS, toTS *topo.Server) {
var err error
if *doKeyspaces {
err = helpers.CompareKeyspaces(ctx, fromTS, toTS)
if err != nil {
log.Exitf("Compare keyspaces failed: %v", err)
}
}
if *doShards {
err = helpers.CompareShards(ctx, fromTS, toTS)
if err != nil {
log.Exitf("Compare shards failed: %v", err)
}
}
if *doShardReplications {
err = helpers.CompareShardReplications(ctx, fromTS, toTS)
if err != nil {
log.Exitf("Compare shard replications failed: %v", err)
}
}
if *doTablets {
err = helpers.CompareTablets(ctx, fromTS, toTS)
if err != nil {
log.Exitf("Compare tablets failed: %v", err)
}
}
if err == nil {
fmt.Println("Topologies are in sync")
os.Exit(0)
}
}
261 changes: 261 additions & 0 deletions go/vt/topo/helpers/compare.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
/*
Copyright 2018 The Vitess Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package helpers contains a few utility classes to handle topo.Server
// objects, and transitions from one topo implementation to another.
package helpers

import (
"fmt"
"reflect"
"sync"

"golang.org/x/net/context"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/topo"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// CompareKeyspaces will create the keyspaces in the destination topo.
rafael marked this conversation as resolved.
Show resolved Hide resolved
func CompareKeyspaces(ctx context.Context, fromTS, toTS *topo.Server) error {
keyspaces, err := fromTS.GetKeyspaces(ctx)
if err != nil {
return fmt.Errorf("GetKeyspace(%v): %v", keyspaces, err)
}

wg := sync.WaitGroup{}
rec := concurrency.AllErrorRecorder{}
for _, keyspace := range keyspaces {
wg.Add(1)
go func(keyspace string) {
defer wg.Done()

fromKs, err := fromTS.GetKeyspace(ctx, keyspace)
if err != nil {
rec.RecordError(fmt.Errorf("GetKeyspace(%v): %v", keyspace, err))
return
}

toKs, err := toTS.GetKeyspace(ctx, keyspace)
if err != nil {
rec.RecordError(fmt.Errorf("GetKeyspace(%v): %v", keyspace, err))
return
}

if !reflect.DeepEqual(fromKs.Keyspace, toKs.Keyspace) {
rec.RecordError(fmt.Errorf("Keyspace: %v does not match between from and to topology", keyspace))
return
}

fromVs, err := fromTS.GetVSchema(ctx, keyspace)
switch {
case err == nil:
// Nothing to do.
case topo.IsErrType(err, topo.NoNode):
// Nothing to do.
default:
rec.RecordError(fmt.Errorf("GetVSchema(%v): %v", keyspace, err))
return
}

toVs, err := toTS.GetVSchema(ctx, keyspace)
switch {
case err == nil:
// Nothing to do.
case topo.IsErrType(err, topo.NoNode):
// Nothing to do.
default:
rec.RecordError(fmt.Errorf("GetVSchema(%v): %v", keyspace, err))
return
}

if !reflect.DeepEqual(fromVs, toVs) {
rec.RecordError(fmt.Errorf("Vschema for keyspace: %v does not match between from and to topology", keyspace))
return
}
}(keyspace)
}
wg.Wait()
if rec.HasErrors() {
return fmt.Errorf("compareKeyspaces failed: %v", rec.Error())
}
return nil
}

// CompareShards will create the shards in the destination topo.
rafael marked this conversation as resolved.
Show resolved Hide resolved
func CompareShards(ctx context.Context, fromTS, toTS *topo.Server) error {
keyspaces, err := fromTS.GetKeyspaces(ctx)
if err != nil {
return fmt.Errorf("fromTS.GetKeyspaces: %v", err)
}

wg := sync.WaitGroup{}
rec := concurrency.AllErrorRecorder{}
for _, keyspace := range keyspaces {
wg.Add(1)
go func(keyspace string) {
defer wg.Done()
shards, err := fromTS.GetShardNames(ctx, keyspace)
if err != nil {
rec.RecordError(fmt.Errorf("GetShardNames(%v): %v", keyspace, err))
return
}

for _, shard := range shards {
wg.Add(1)
go func(keyspace, shard string) {
rafael marked this conversation as resolved.
Show resolved Hide resolved
defer wg.Done()

fromSi, err := fromTS.GetShard(ctx, keyspace, shard)
if err != nil {
rec.RecordError(fmt.Errorf("GetShard(%v, %v): %v", keyspace, shard, err))
return
}
toSi, err := toTS.GetShard(ctx, keyspace, shard)
if err != nil {
rec.RecordError(fmt.Errorf("GetShard(%v, %v): %v", keyspace, shard, err))
return
}

if !reflect.DeepEqual(fromSi.Shard, toSi.Shard) {
rec.RecordError(fmt.Errorf("Shard %v for keyspace: %v does not match between from and to topology", shard, keyspace))
return
}
}(keyspace, shard)
}
}(keyspace)
}
wg.Wait()
if rec.HasErrors() {
return fmt.Errorf("compareShards failed: %v", rec.Error())
}
return nil
}

// CompareTablets will create the tablets in the destination topo.
rafael marked this conversation as resolved.
Show resolved Hide resolved
func CompareTablets(ctx context.Context, fromTS, toTS *topo.Server) error {
cells, err := fromTS.GetKnownCells(ctx)
if err != nil {
return fmt.Errorf("fromTS.GetKnownCells: %v", err)
}

wg := sync.WaitGroup{}
rec := concurrency.AllErrorRecorder{}
for _, cell := range cells {
wg.Add(1)
go func(cell string) {
defer wg.Done()
tabletAliases, err := fromTS.GetTabletsByCell(ctx, cell)
if err != nil {
rec.RecordError(fmt.Errorf("GetTabletsByCell(%v): %v", cell, err))
} else {
for _, tabletAlias := range tabletAliases {
wg.Add(1)
go func(tabletAlias *topodatapb.TabletAlias) {
defer wg.Done()

// read the source tablet
fromTi, err := fromTS.GetTablet(ctx, tabletAlias)
if err != nil {
rec.RecordError(fmt.Errorf("GetTablet(%v): %v", tabletAlias, err))
return
}
toTi, err := toTS.GetTablet(ctx, tabletAlias)
if err != nil {
rec.RecordError(fmt.Errorf("GetTablet(%v): %v", tabletAlias, err))
return
}
if !reflect.DeepEqual(fromTi.Tablet, toTi.Tablet) {
rec.RecordError(fmt.Errorf("Tablet %v: does not match between from and to topology", tabletAlias))
return
}
}(tabletAlias)
}
}
}(cell)
}
wg.Wait()
if rec.HasErrors() {
return fmt.Errorf("compareTablets failed: %v", rec.Error())
}
return nil
}

// CompareShardReplications will create the ShardReplication objects in
rafael marked this conversation as resolved.
Show resolved Hide resolved
// the destination topo.
func CompareShardReplications(ctx context.Context, fromTS, toTS *topo.Server) error {
keyspaces, err := fromTS.GetKeyspaces(ctx)
if err != nil {
return fmt.Errorf("fromTS.GetKeyspaces: %v", err)
}

wg := sync.WaitGroup{}
rec := concurrency.AllErrorRecorder{}
for _, keyspace := range keyspaces {
wg.Add(1)
go func(keyspace string) {
defer wg.Done()
shards, err := fromTS.GetShardNames(ctx, keyspace)
if err != nil {
rec.RecordError(fmt.Errorf("GetShardNames(%v): %v", keyspace, err))
return
}

for _, shard := range shards {
wg.Add(1)
go func(keyspace, shard string) {
defer wg.Done()

// read the source shard to get the cells
si, err := fromTS.GetShard(ctx, keyspace, shard)
if err != nil {
rec.RecordError(fmt.Errorf("GetShard(%v, %v): %v", keyspace, shard, err))
return
}

for _, cell := range si.Shard.Cells {
fromSRi, err := fromTS.GetShardReplication(ctx, cell, keyspace, shard)
if err != nil {
rec.RecordError(fmt.Errorf("GetShardReplication(%v, %v, %v): %v", cell, keyspace, shard, err))
continue
}
toSRi, err := toTS.GetShardReplication(ctx, cell, keyspace, shard)
if err != nil {
rec.RecordError(fmt.Errorf("GetShardReplication(%v, %v, %v): %v", cell, keyspace, shard, err))
continue
}
if !reflect.DeepEqual(fromSRi.ShardReplication, toSRi.ShardReplication) {
rec.RecordError(
fmt.Errorf(
"Shard Replication in cell %v, keyspace %v, shard %v: does not match between from and to topology",
cell,
keyspace,
shard),
)
return
}
}
}(keyspace, shard)
}
}(keyspace)
}
wg.Wait()
if rec.HasErrors() {
return fmt.Errorf("compareShardReplication failed: %v", rec.Error())
}
return nil
}
Loading