From 729edf3586902e164061034b94f2531069e4aee5 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Tue, 27 Apr 2021 08:25:30 -0400 Subject: [PATCH 1/2] Migrate `UpdateShardRecords`, `RefreshTabletsByShard`, and `{Get,Save}RoutingRules` I am moving these to `package topotools` to reuse this code in both `package grpcvtctldserver` (which wrangler imports) and `package wrangler`. Signed-off-by: Andrew Mason --- go/vt/topotools/keyspace.go | 119 ++++++++++++++++++++++++++ go/vt/topotools/routing_rules.go | 58 +++++++++++++ go/vt/topotools/routing_rules_test.go | 68 +++++++++++++++ go/vt/wrangler/keyspace.go | 66 +------------- go/vt/wrangler/traffic_switcher.go | 20 +---- 5 files changed, 249 insertions(+), 82 deletions(-) create mode 100644 go/vt/topotools/keyspace.go create mode 100644 go/vt/topotools/routing_rules.go create mode 100644 go/vt/topotools/routing_rules_test.go diff --git a/go/vt/topotools/keyspace.go b/go/vt/topotools/keyspace.go new file mode 100644 index 00000000000..a166da169bb --- /dev/null +++ b/go/vt/topotools/keyspace.go @@ -0,0 +1,119 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topotools + +import ( + "context" + "sync" + "time" + + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +// RefreshTabletsByShard calls RefreshState on all the tablets in a given shard. +// +// It only returns errors from looking up the tablet map from the topology; +// errors returned from any RefreshState RPCs are logged and then ignored. Also, +// any tablets without a .Hostname set in the topology are skipped. +func RefreshTabletsByShard(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, si *topo.ShardInfo, cells []string, logger logutil.Logger) error { + logger.Infof("RefreshTabletsByShard called on shard %v/%v", si.Keyspace(), si.ShardName()) + + tabletMap, err := ts.GetTabletMapForShardByCell(ctx, si.Keyspace(), si.ShardName(), cells) + switch { + case err == nil: + // keep going + case topo.IsErrType(err, topo.PartialResult): + logger.Warningf("RefreshTabletsByShard: got partial result for shard %v/%v, may not refresh all tablets everywhere", si.Keyspace(), si.ShardName()) + default: + return err + } + + // Any errors from this point onward are ignored. + var wg sync.WaitGroup + for _, ti := range tabletMap { + if ti.Hostname == "" { + // The tablet is not running, we don't have the host + // name to connect to, so we just skip this tablet. + logger.Infof("Tablet %v has no hostname, skipping its RefreshState", ti.AliasString()) + continue + } + + wg.Add(1) + go func(ti *topo.TabletInfo) { + defer wg.Done() + logger.Infof("Calling RefreshState on tablet %v", ti.AliasString()) + + ctx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + + if err := tmc.RefreshState(ctx, ti.Tablet); err != nil { + logger.Warningf("RefreshTabletsByShard: failed to refresh %v: %v", ti.AliasString(), err) + } + }(ti) + } + + wg.Wait() + return nil +} + +// UpdateShardRecords updates the shard records based on 'from' or 'to' +// direction. +func UpdateShardRecords( + ctx context.Context, + ts *topo.Server, + tmc tmclient.TabletManagerClient, + keyspace string, + shards []*topo.ShardInfo, + cells []string, + servedType topodatapb.TabletType, + isFrom bool, + clearSourceShards bool, + logger logutil.Logger, +) error { + disableQueryService := isFrom + if err := ts.UpdateDisableQueryService(ctx, keyspace, shards, servedType, cells, disableQueryService); err != nil { + return err + } + + for i, si := range shards { + updatedShard, err := ts.UpdateShardFields(ctx, si.Keyspace(), si.ShardName(), func(si *topo.ShardInfo) error { + if clearSourceShards { + si.SourceShards = nil + } + + return nil + }) + + if err != nil { + return err + } + + shards[i] = updatedShard + + // For 'to' shards, refresh to make them serve. The 'from' shards will + // be refreshed after traffic has migrated. + if !isFrom { + _ = RefreshTabletsByShard(ctx, ts, tmc, si, cells, logger) + } + } + + return nil +} diff --git a/go/vt/topotools/routing_rules.go b/go/vt/topotools/routing_rules.go new file mode 100644 index 00000000000..9fa1ed0061d --- /dev/null +++ b/go/vt/topotools/routing_rules.go @@ -0,0 +1,58 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topotools + +import ( + "context" + + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" + + vschemapb "vitess.io/vitess/go/vt/proto/vschema" +) + +// GetRoutingRules fetches routing rules from the topology server and returns a +// mapping of fromTable=>[]toTables. +func GetRoutingRules(ctx context.Context, ts *topo.Server) (map[string][]string, error) { + rrs, err := ts.GetRoutingRules(ctx) + if err != nil { + return nil, err + } + + rules := make(map[string][]string, len(rrs.Rules)) + for _, rr := range rrs.Rules { + rules[rr.FromTable] = rr.ToTables + } + + return rules, nil +} + +// SaveRoutingRules converts a mapping of fromTable=>[]toTables into a +// vschemapb.RoutingRules protobuf message and saves it in the topology. +func SaveRoutingRules(ctx context.Context, ts *topo.Server, rules map[string][]string) error { + log.Infof("Saving routing rules %v\n", rules) + + rrs := &vschemapb.RoutingRules{Rules: make([]*vschemapb.RoutingRule, 0, len(rules))} + for from, to := range rules { + rrs.Rules = append(rrs.Rules, &vschemapb.RoutingRule{ + FromTable: from, + ToTables: to, + }) + } + + return ts.SaveRoutingRules(ctx, rrs) +} diff --git a/go/vt/topotools/routing_rules_test.go b/go/vt/topotools/routing_rules_test.go new file mode 100644 index 00000000000..fe868d6754a --- /dev/null +++ b/go/vt/topotools/routing_rules_test.go @@ -0,0 +1,68 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topotools + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/topo/memorytopo" +) + +func TestRoutingRulesRoundTrip(t *testing.T) { + ctx := context.Background() + ts := memorytopo.NewServer("zone1") + + rules := map[string][]string{ + "t1": {"t2", "t3"}, + "t4": {"t5"}, + } + + err := SaveRoutingRules(ctx, ts, rules) + require.NoError(t, err, "could not save routing rules to topo %v", rules) + + roundtripRules, err := GetRoutingRules(ctx, ts) + require.NoError(t, err, "could not fetch routing rules from topo") + + assert.Equal(t, rules, roundtripRules) +} + +func TestRoutingRulesErrors(t *testing.T) { + ctx := context.Background() + ts, factory := memorytopo.NewServerAndFactory("zone1") + factory.SetError(errors.New("topo failure for testing")) + + t.Run("GetRoutingRules error", func(t *testing.T) { + + rules, err := GetRoutingRules(ctx, ts) + assert.Error(t, err, "expected error from GetRoutingRules, got rules=%v", rules) + }) + + t.Run("SaveRoutingRules error", func(t *testing.T) { + rules := map[string][]string{ + "t1": {"t2", "t3"}, + "t4": {"t5"}, + } + + err := SaveRoutingRules(ctx, ts, rules) + assert.Error(t, err, "expected error from GetRoutingRules, got rules=%v", rules) + }) +} diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index 6ecd1b1d774..4a089d5811b 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -918,32 +918,7 @@ func (wr *Wrangler) startReverseReplication(ctx context.Context, sourceShards [] // updateShardRecords updates the shard records based on 'from' or 'to' direction. func (wr *Wrangler) updateShardRecords(ctx context.Context, keyspace string, shards []*topo.ShardInfo, cells []string, servedType topodatapb.TabletType, isFrom bool, clearSourceShards bool) (err error) { - err = wr.ts.UpdateDisableQueryService(ctx, keyspace, shards, servedType, cells, isFrom /* disable */) - if err != nil { - return err - } - - for i, si := range shards { - updatedShard, err := wr.ts.UpdateShardFields(ctx, si.Keyspace(), si.ShardName(), func(si *topo.ShardInfo) error { - if clearSourceShards { - si.SourceShards = nil - } - return nil - }) - - if err != nil { - return err - } - - shards[i] = updatedShard - - // For 'to' shards, refresh to make them serve. - // The 'from' shards will be refreshed after traffic has migrated. - if !isFrom { - wr.RefreshTabletsByShard(ctx, si, cells) - } - } - return nil + return topotools.UpdateShardRecords(ctx, wr.ts, wr.tmc, keyspace, shards, cells, servedType, isFrom, clearSourceShards, wr.Logger()) } // updateFrozenFlag sets or unsets the Frozen flag for master migration. This is performed @@ -1384,44 +1359,7 @@ func (wr *Wrangler) SetKeyspaceServedFrom(ctx context.Context, keyspace string, // RefreshTabletsByShard calls RefreshState on all the tablets in a given shard. func (wr *Wrangler) RefreshTabletsByShard(ctx context.Context, si *topo.ShardInfo, cells []string) error { - wr.Logger().Infof("RefreshTabletsByShard called on shard %v/%v", si.Keyspace(), si.ShardName()) - tabletMap, err := wr.ts.GetTabletMapForShardByCell(ctx, si.Keyspace(), si.ShardName(), cells) - switch { - case err == nil: - // keep going - case topo.IsErrType(err, topo.PartialResult): - wr.Logger().Warningf("RefreshTabletsByShard: got partial result for shard %v/%v, may not refresh all tablets everywhere", si.Keyspace(), si.ShardName()) - default: - return err - } - - // ignore errors in this phase - wg := sync.WaitGroup{} - for _, ti := range tabletMap { - if ti.Hostname == "" { - // The tablet is not running, we don't have the host - // name to connect to, so we just skip this tablet. - wr.Logger().Infof("Tablet %v has no hostname, skipping its RefreshState", ti.AliasString()) - continue - } - - wg.Add(1) - go func(ti *topo.TabletInfo) { - wr.Logger().Infof("Calling RefreshState on tablet %v", ti.AliasString()) - // Setting an upper bound timeout to fail faster in case of an error. - // Using 60 seconds because RefreshState should not take more than 30 seconds. - // (RefreshState will restart the tablet's QueryService and most time will be spent on the shutdown, i.e. waiting up to 30 seconds on transactions (see Config.TransactionTimeout)). - ctx, cancel := context.WithTimeout(ctx, 60*time.Second) - if err := wr.tmc.RefreshState(ctx, ti.Tablet); err != nil { - wr.Logger().Warningf("RefreshTabletsByShard: failed to refresh %v: %v", ti.AliasString(), err) - } - cancel() - wg.Done() - }(ti) - } - wg.Wait() - - return nil + return topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, cells, wr.Logger()) } // DeleteKeyspace will do all the necessary changes in the topology server diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 06c92c3f88a..f4003c9a9f4 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -1788,27 +1788,11 @@ func (ts *trafficSwitcher) deleteRoutingRules(ctx context.Context) error { } func (wr *Wrangler) getRoutingRules(ctx context.Context) (map[string][]string, error) { - rrs, err := wr.ts.GetRoutingRules(ctx) - if err != nil { - return nil, err - } - rules := make(map[string][]string, len(rrs.Rules)) - for _, rr := range rrs.Rules { - rules[rr.FromTable] = rr.ToTables - } - return rules, nil + return topotools.GetRoutingRules(ctx, wr.ts) } func (wr *Wrangler) saveRoutingRules(ctx context.Context, rules map[string][]string) error { - log.Infof("Saving routing rules %v\n", rules) - rrs := &vschemapb.RoutingRules{Rules: make([]*vschemapb.RoutingRule, 0, len(rules))} - for from, to := range rules { - rrs.Rules = append(rrs.Rules, &vschemapb.RoutingRule{ - FromTable: from, - ToTables: to, - }) - } - return wr.ts.SaveRoutingRules(ctx, rrs) + return topotools.SaveRoutingRules(ctx, wr.ts, rules) } func reverseName(workflow string) string { From b9f463fa1e1e651c03e65f22dfbf9d296152bca9 Mon Sep 17 00:00:00 2001 From: Andrew Mason Date: Wed, 28 Apr 2021 13:07:48 -0400 Subject: [PATCH 2/2] Log errors from RefreshTabletsByShard Signed-off-by: Andrew Mason --- go/vt/topotools/keyspace.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/vt/topotools/keyspace.go b/go/vt/topotools/keyspace.go index a166da169bb..480adc6790f 100644 --- a/go/vt/topotools/keyspace.go +++ b/go/vt/topotools/keyspace.go @@ -111,7 +111,9 @@ func UpdateShardRecords( // For 'to' shards, refresh to make them serve. The 'from' shards will // be refreshed after traffic has migrated. if !isFrom { - _ = RefreshTabletsByShard(ctx, ts, tmc, si, cells, logger) + if err := RefreshTabletsByShard(ctx, ts, tmc, si, cells, logger); err != nil { + logger.Warningf("RefreshTabletsByShard(%v/%v, cells=%v) failed with %v; continuing ...", si.Keyspace(), si.ShardName(), cells, err) + } } }