From f08b35fa178a0c38361fec9f1a4d08e470cf7dcd Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Wed, 26 May 2021 08:41:28 -0400 Subject: [PATCH 1/2] Remove wrangler wrappers around topotools functions Signed-off-by: Andrew Mason --- go/vt/wrangler/materializer.go | 6 ++++-- go/vt/wrangler/traffic_switcher.go | 24 +++++++-------------- go/vt/wrangler/traffic_switcher_env_test.go | 2 +- go/vt/wrangler/traffic_switcher_test.go | 10 ++++----- 4 files changed, 18 insertions(+), 24 deletions(-) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 26776a53644..f927e75de9a 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -30,6 +30,8 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl/tmutils" querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vtctl/workflow" "vitess.io/vitess/go/vt/vtgate/evalengine" "context" @@ -158,7 +160,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta if externalTopo == nil { // Save routing rules before vschema. If we save vschema first, and routing rules // fails to save, we may generate duplicate table errors. - rules, err := wr.getRoutingRules(ctx) + rules, err := topotools.GetRoutingRules(ctx, wr.ts) if err != nil { return err } @@ -174,7 +176,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta rules[sourceKeyspace+"."+table+"@replica"] = toSource rules[sourceKeyspace+"."+table+"@rdonly"] = toSource } - if err := wr.saveRoutingRules(ctx, rules); err != nil { + if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil { return err } if vschema != nil { diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index b96ac7a0a4a..8c4379df652 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -306,7 +306,7 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl return nil, nil, err } ws.ReplicaCellsNotSwitched, ws.ReplicaCellsSwitched = cellsNotSwitched, cellsSwitched - rules, err := ts.wr.getRoutingRules(ctx) + rules, err := topotools.GetRoutingRules(ctx, ts.wr.ts) if err != nil { return nil, nil, err } @@ -945,7 +945,7 @@ func (ts *trafficSwitcher) compareShards(ctx context.Context, keyspace string, s func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { log.Infof("switchTableReads: servedTypes: %+v, direction %t", servedTypes, direction) - rules, err := ts.wr.getRoutingRules(ctx) + rules, err := topotools.GetRoutingRules(ctx, ts.wr.ts) if err != nil { return err } @@ -972,7 +972,7 @@ func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, } } } - if err := ts.wr.saveRoutingRules(ctx, rules); err != nil { + if err := topotools.SaveRoutingRules(ctx, ts.wr.ts, rules); err != nil { return err } return ts.wr.ts.RebuildSrvVSchema(ctx, cells) @@ -1333,7 +1333,7 @@ func (ts *trafficSwitcher) changeRouting(ctx context.Context) error { } func (ts *trafficSwitcher) changeWriteRoute(ctx context.Context) error { - rules, err := ts.wr.getRoutingRules(ctx) + rules, err := topotools.GetRoutingRules(ctx, ts.wr.ts) if err != nil { return err } @@ -1344,7 +1344,7 @@ func (ts *trafficSwitcher) changeWriteRoute(ctx context.Context) error { rules[ts.sourceKeyspace+"."+table] = []string{ts.targetKeyspace + "." + table} ts.wr.Logger().Infof("Add routing: %v %v", table, ts.sourceKeyspace+"."+table) } - if err := ts.wr.saveRoutingRules(ctx, rules); err != nil { + if err := topotools.SaveRoutingRules(ctx, ts.wr.ts, rules); err != nil { return err } return ts.wr.ts.RebuildSrvVSchema(ctx, nil) @@ -1515,7 +1515,7 @@ func doValidateWorkflowHasCompleted(ctx context.Context, ts *trafficSwitcher) er //check if table is routable wg.Wait() if ts.migrationType == binlogdatapb.MigrationType_TABLES { - rules, err := ts.wr.getRoutingRules(ctx) + rules, err := topotools.GetRoutingRules(ctx, ts.wr.ts) if err != nil { rec.RecordError(fmt.Errorf("could not get RoutingRules")) } @@ -1665,7 +1665,7 @@ func (ts *trafficSwitcher) dropTargetShards(ctx context.Context) error { } func (ts *trafficSwitcher) deleteRoutingRules(ctx context.Context) error { - rules, err := ts.wr.getRoutingRules(ctx) + rules, err := topotools.GetRoutingRules(ctx, ts.wr.ts) if err != nil { return err } @@ -1680,20 +1680,12 @@ func (ts *trafficSwitcher) deleteRoutingRules(ctx context.Context) error { delete(rules, ts.sourceKeyspace+"."+table+"@replica") delete(rules, ts.sourceKeyspace+"."+table+"@rdonly") } - if err := ts.wr.saveRoutingRules(ctx, rules); err != nil { + if err := topotools.SaveRoutingRules(ctx, ts.wr.ts, rules); err != nil { return err } return nil } -func (wr *Wrangler) getRoutingRules(ctx context.Context) (map[string][]string, error) { - return topotools.GetRoutingRules(ctx, wr.ts) -} - -func (wr *Wrangler) saveRoutingRules(ctx context.Context, rules map[string][]string) error { - return topotools.SaveRoutingRules(ctx, wr.ts, rules) -} - // addParticipatingTablesToKeyspace updates the vschema with the new tables that were created as part of the // Migrate flow. It is called when the Migrate flow is Completed func (ts *trafficSwitcher) addParticipatingTablesToKeyspace(ctx context.Context, keyspace, tableSpecs string) error { diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index fc025ff531d..d9e49802f87 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -220,7 +220,7 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, ) } - if err := tme.wr.saveRoutingRules(ctx, map[string][]string{ + if err := topotools.SaveRoutingRules(ctx, tme.wr.ts, map[string][]string{ "t1": {"ks1.t1"}, "ks2.t1": {"ks1.t1"}, "t2": {"ks1.t2"}, diff --git a/go/vt/wrangler/traffic_switcher_test.go b/go/vt/wrangler/traffic_switcher_test.go index e046ff8985e..691b41f5caf 100644 --- a/go/vt/wrangler/traffic_switcher_test.go +++ b/go/vt/wrangler/traffic_switcher_test.go @@ -17,6 +17,7 @@ limitations under the License. package wrangler import ( + "context" "errors" "fmt" "reflect" @@ -25,15 +26,14 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/stretchr/testify/require" - "context" - "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topotools" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/topo" ) var ( @@ -1737,7 +1737,7 @@ func TestReverseVReplicationUpdateQuery(t *testing.T) { func checkRouting(t *testing.T, wr *Wrangler, want map[string][]string) { t.Helper() ctx := context.Background() - got, err := wr.getRoutingRules(ctx) + got, err := topotools.GetRoutingRules(ctx, wr.ts) if err != nil { t.Fatal(err) } From 5c87fe51738a3c5464541af9a0ccc1ebf1d56b03 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Wed, 26 May 2021 11:20:07 -0400 Subject: [PATCH 2/2] Migrate `CheckReshardingJournalExistsOnTablet` from wrangler to package workflow I also removed the unnecessary `if !exists` check, since not taking that path was impossible. Signed-off-by: Andrew Mason --- go/vt/vtctl/workflow/server.go | 33 +++++- go/vt/vtctl/workflow/server_test.go | 165 ++++++++++++++++++++++++++++ go/vt/wrangler/materializer.go | 12 +- go/vt/wrangler/traffic_switcher.go | 32 +----- 4 files changed, 211 insertions(+), 31 deletions(-) create mode 100644 go/vt/vtctl/workflow/server_test.go diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index faac7d9efd8..884b9f22c3b 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -24,7 +24,6 @@ import ( "time" "google.golang.org/protobuf/encoding/prototext" - "k8s.io/apimachinery/pkg/util/sets" "vitess.io/vitess/go/sqltypes" @@ -34,6 +33,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tmclient" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" "vitess.io/vitess/go/vt/proto/vttime" ) @@ -73,6 +73,37 @@ func NewServer(ts *topo.Server, tmc tmclient.TabletManagerClient) *Server { } } +// CheckReshardingJournalExistsOnTablet returns the journal (or an empty +// journal) and a boolean to indicate if the resharding_journal table exists on +// the given tablet. +// +// (TODO:@ajm188) This should not be part of the final public API, and should +// be un-exported after all places in package wrangler that call this have been +// migrated over. +func (s *Server) CheckReshardingJournalExistsOnTablet(ctx context.Context, tablet *topodatapb.Tablet, migrationID int64) (*binlogdatapb.Journal, bool, error) { + var ( + journal binlogdatapb.Journal + exists bool + ) + + query := fmt.Sprintf("select val from _vt.resharding_journal where id=%v", migrationID) + p3qr, err := s.tmc.VReplicationExec(ctx, tablet, query) + if err != nil { + return nil, false, err + } + + if len(p3qr.Rows) != 0 { + qr := sqltypes.Proto3ToResult(p3qr) + if err := prototext.Unmarshal(qr.Rows[0][0].ToBytes(), &journal); err != nil { + return nil, false, err + } + + exists = true + } + + return &journal, exists, nil +} + // GetWorkflows returns a list of all workflows that exist in a given keyspace, // with some additional filtering depending on the request parameters (for // example, ActiveOnly=true restricts the search to only workflows that are diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go new file mode 100644 index 00000000000..85c60336351 --- /dev/null +++ b/go/vt/vtctl/workflow/server_test.go @@ -0,0 +1,165 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/prototext" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/utils" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +type fakeTMC struct { + tmclient.TabletManagerClient + vrepQueriesByTablet map[string]map[string]*querypb.QueryResult +} + +func (fake *fakeTMC) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) { + alias := topoproto.TabletAliasString(tablet.Alias) + tabletQueries, ok := fake.vrepQueriesByTablet[alias] + if !ok { + return nil, fmt.Errorf("no query map registered on fake for %s", alias) + } + + p3qr, ok := tabletQueries[query] + if !ok { + return nil, fmt.Errorf("no result on fake for query %q on tablet %s", query, alias) + } + + return p3qr, nil +} + +func TestCheckReshardingJournalExistsOnTablet(t *testing.T) { + t.Parallel() + + ctx := context.Background() + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "zone1", + Uid: 100, + }, + } + journal := &binlogdatapb.Journal{ + Id: 1, + MigrationType: binlogdatapb.MigrationType_SHARDS, + Tables: []string{"t1", "t2"}, + } + journalBytes, err := prototext.Marshal(journal) + require.NoError(t, err, "could not marshal journal %+v into bytes", journal) + + // get some bytes that will fail to unmarshal into a binlogdatapb.Journal + tabletBytes, err := prototext.Marshal(tablet) + require.NoError(t, err, "could not marshal tablet %+v into bytes", tablet) + + p3qr := sqltypes.ResultToProto3(sqltypes.MakeTestResult([]*querypb.Field{ + { + Name: "val", + Type: querypb.Type_BLOB, + }, + }, string(journalBytes))) + + tests := []struct { + name string + tablet *topodatapb.Tablet + result *querypb.QueryResult + journal *binlogdatapb.Journal + shouldExist bool + shouldErr bool + }{ + { + name: "journal exists", + tablet: tablet, + result: p3qr, + shouldExist: true, + journal: journal, + }, + { + name: "journal does not exist", + tablet: tablet, + result: sqltypes.ResultToProto3(sqltypes.MakeTestResult(nil)), + journal: &binlogdatapb.Journal{}, + shouldExist: false, + }, + { + name: "cannot unmarshal into journal", + tablet: tablet, + result: sqltypes.ResultToProto3(sqltypes.MakeTestResult([]*querypb.Field{ + { + Name: "val", + Type: querypb.Type_BLOB, + }, + }, string(tabletBytes))), + shouldErr: true, + }, + { + name: "VReplicationExec fails on tablet", + tablet: &topodatapb.Tablet{ // Here we use a different tablet to force the fake to return an error + Alias: &topodatapb.TabletAlias{ + Cell: "zone2", + Uid: 200, + }, + }, + shouldErr: true, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + tmc := &fakeTMC{ + vrepQueriesByTablet: map[string]map[string]*querypb.QueryResult{ + topoproto.TabletAliasString(tablet.Alias): { // always use the tablet shared by these tests cases + "select val from _vt.resharding_journal where id=1": tt.result, + }, + }, + } + + ws := NewServer(nil, tmc) + journal, exists, err := ws.CheckReshardingJournalExistsOnTablet(ctx, tt.tablet, 1) + if tt.shouldErr { + assert.Error(t, err) + return + } + + require.NoError(t, err) + + existAssertionMsg := "expected journal to " + if tt.shouldExist { + existAssertionMsg += "already exist on tablet" + } else { + existAssertionMsg += "not exist" + } + + assert.Equal(t, tt.shouldExist, exists, existAssertionMsg) + utils.MustMatch(t, tt.journal, journal, "journal in resharding_journal did not match") + }) + } +} diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index f927e75de9a..163e300ea9a 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -313,9 +313,13 @@ func (wr *Wrangler) checkIfPreviousJournalExists(ctx context.Context, mz *materi return allErrors.AggrError(vterrors.Aggregate) } - var mu sync.Mutex - var exists bool - var tablets []string + var ( + mu sync.Mutex + exists bool + tablets []string + ws = workflow.NewServer(wr.ts, wr.tmc) + ) + err := forAllSources(func(si *topo.ShardInfo) error { tablet, err := wr.ts.GetTablet(ctx, si.MasterAlias) if err != nil { @@ -324,7 +328,7 @@ func (wr *Wrangler) checkIfPreviousJournalExists(ctx context.Context, mz *materi if tablet == nil { return nil } - _, exists, err = wr.checkIfJournalExistsOnTablet(ctx, tablet.Tablet, migrationID) + _, exists, err = ws.CheckReshardingJournalExistsOnTablet(ctx, tablet.Tablet, migrationID) if err != nil { return err } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 8c4379df652..fc34a5af588 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -26,10 +26,7 @@ import ( "sync" "time" - "google.golang.org/protobuf/encoding/prototext" - "vitess.io/vitess/go/json2" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/key" @@ -1012,35 +1009,18 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, return nil } -func (wr *Wrangler) checkIfJournalExistsOnTablet(ctx context.Context, tablet *topodatapb.Tablet, migrationID int64) (*binlogdatapb.Journal, bool, error) { - var exists bool - journal := &binlogdatapb.Journal{} - query := fmt.Sprintf("select val from _vt.resharding_journal where id=%v", migrationID) - p3qr, err := wr.tmc.VReplicationExec(ctx, tablet, query) - if err != nil { - return nil, false, err - } - if len(p3qr.Rows) != 0 { - qr := sqltypes.Proto3ToResult(p3qr) - if !exists { - if err := prototext.Unmarshal(qr.Rows[0][0].ToBytes(), journal); err != nil { - return nil, false, err - } - exists = true - } - } - return journal, exists, nil - -} - // checkJournals returns true if at least one journal has been created. // If so, it also returns the list of sourceWorkflows that need to be switched. func (ts *trafficSwitcher) checkJournals(ctx context.Context) (journalsExist bool, sourceWorkflows []string, err error) { - var mu sync.Mutex + var ( + ws = workflow.NewServer(ts.wr.ts, ts.wr.tmc) + mu sync.Mutex + ) + err = ts.forAllSources(func(source *workflow.MigrationSource) error { mu.Lock() defer mu.Unlock() - journal, exists, err := ts.wr.checkIfJournalExistsOnTablet(ctx, source.GetPrimary().Tablet, ts.id) + journal, exists, err := ws.CheckReshardingJournalExistsOnTablet(ctx, source.GetPrimary().Tablet, ts.id) if err != nil { return err }