Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request vitessio#7965 from tinyspeck/am_move_wrangler_topo…
Browse files Browse the repository at this point in the history
…funcs_to_topotools

[wrangler|topotools] Migrate `UpdateShardRecords`, `RefreshTabletsByShard`, and `{Get,Save}RoutingRules`
  • Loading branch information
rafael authored Apr 30, 2021
2 parents f5ed5b9 + b9f463f commit 4932904
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 82 deletions.
121 changes: 121 additions & 0 deletions go/vt/topotools/keyspace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package topotools

import (
"context"
"sync"
"time"

"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tmclient"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// RefreshTabletsByShard calls RefreshState on all the tablets in a given shard.
//
// It only returns errors from looking up the tablet map from the topology;
// errors returned from any RefreshState RPCs are logged and then ignored. Also,
// any tablets without a .Hostname set in the topology are skipped.
func RefreshTabletsByShard(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, si *topo.ShardInfo, cells []string, logger logutil.Logger) error {
logger.Infof("RefreshTabletsByShard called on shard %v/%v", si.Keyspace(), si.ShardName())

tabletMap, err := ts.GetTabletMapForShardByCell(ctx, si.Keyspace(), si.ShardName(), cells)
switch {
case err == nil:
// keep going
case topo.IsErrType(err, topo.PartialResult):
logger.Warningf("RefreshTabletsByShard: got partial result for shard %v/%v, may not refresh all tablets everywhere", si.Keyspace(), si.ShardName())
default:
return err
}

// Any errors from this point onward are ignored.
var wg sync.WaitGroup
for _, ti := range tabletMap {
if ti.Hostname == "" {
// The tablet is not running, we don't have the host
// name to connect to, so we just skip this tablet.
logger.Infof("Tablet %v has no hostname, skipping its RefreshState", ti.AliasString())
continue
}

wg.Add(1)
go func(ti *topo.TabletInfo) {
defer wg.Done()
logger.Infof("Calling RefreshState on tablet %v", ti.AliasString())

ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

if err := tmc.RefreshState(ctx, ti.Tablet); err != nil {
logger.Warningf("RefreshTabletsByShard: failed to refresh %v: %v", ti.AliasString(), err)
}
}(ti)
}

wg.Wait()
return nil
}

// UpdateShardRecords updates the shard records based on 'from' or 'to'
// direction.
func UpdateShardRecords(
ctx context.Context,
ts *topo.Server,
tmc tmclient.TabletManagerClient,
keyspace string,
shards []*topo.ShardInfo,
cells []string,
servedType topodatapb.TabletType,
isFrom bool,
clearSourceShards bool,
logger logutil.Logger,
) error {
disableQueryService := isFrom
if err := ts.UpdateDisableQueryService(ctx, keyspace, shards, servedType, cells, disableQueryService); err != nil {
return err
}

for i, si := range shards {
updatedShard, err := ts.UpdateShardFields(ctx, si.Keyspace(), si.ShardName(), func(si *topo.ShardInfo) error {
if clearSourceShards {
si.SourceShards = nil
}

return nil
})

if err != nil {
return err
}

shards[i] = updatedShard

// For 'to' shards, refresh to make them serve. The 'from' shards will
// be refreshed after traffic has migrated.
if !isFrom {
if err := RefreshTabletsByShard(ctx, ts, tmc, si, cells, logger); err != nil {
logger.Warningf("RefreshTabletsByShard(%v/%v, cells=%v) failed with %v; continuing ...", si.Keyspace(), si.ShardName(), cells, err)
}
}
}

return nil
}
58 changes: 58 additions & 0 deletions go/vt/topotools/routing_rules.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package topotools

import (
"context"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"

vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)

// GetRoutingRules fetches routing rules from the topology server and returns a
// mapping of fromTable=>[]toTables.
func GetRoutingRules(ctx context.Context, ts *topo.Server) (map[string][]string, error) {
rrs, err := ts.GetRoutingRules(ctx)
if err != nil {
return nil, err
}

rules := make(map[string][]string, len(rrs.Rules))
for _, rr := range rrs.Rules {
rules[rr.FromTable] = rr.ToTables
}

return rules, nil
}

// SaveRoutingRules converts a mapping of fromTable=>[]toTables into a
// vschemapb.RoutingRules protobuf message and saves it in the topology.
func SaveRoutingRules(ctx context.Context, ts *topo.Server, rules map[string][]string) error {
log.Infof("Saving routing rules %v\n", rules)

rrs := &vschemapb.RoutingRules{Rules: make([]*vschemapb.RoutingRule, 0, len(rules))}
for from, to := range rules {
rrs.Rules = append(rrs.Rules, &vschemapb.RoutingRule{
FromTable: from,
ToTables: to,
})
}

return ts.SaveRoutingRules(ctx, rrs)
}
68 changes: 68 additions & 0 deletions go/vt/topotools/routing_rules_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package topotools

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/topo/memorytopo"
)

func TestRoutingRulesRoundTrip(t *testing.T) {
ctx := context.Background()
ts := memorytopo.NewServer("zone1")

rules := map[string][]string{
"t1": {"t2", "t3"},
"t4": {"t5"},
}

err := SaveRoutingRules(ctx, ts, rules)
require.NoError(t, err, "could not save routing rules to topo %v", rules)

roundtripRules, err := GetRoutingRules(ctx, ts)
require.NoError(t, err, "could not fetch routing rules from topo")

assert.Equal(t, rules, roundtripRules)
}

func TestRoutingRulesErrors(t *testing.T) {
ctx := context.Background()
ts, factory := memorytopo.NewServerAndFactory("zone1")
factory.SetError(errors.New("topo failure for testing"))

t.Run("GetRoutingRules error", func(t *testing.T) {

rules, err := GetRoutingRules(ctx, ts)
assert.Error(t, err, "expected error from GetRoutingRules, got rules=%v", rules)
})

t.Run("SaveRoutingRules error", func(t *testing.T) {
rules := map[string][]string{
"t1": {"t2", "t3"},
"t4": {"t5"},
}

err := SaveRoutingRules(ctx, ts, rules)
assert.Error(t, err, "expected error from GetRoutingRules, got rules=%v", rules)
})
}
66 changes: 2 additions & 64 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,32 +918,7 @@ func (wr *Wrangler) startReverseReplication(ctx context.Context, sourceShards []

// updateShardRecords updates the shard records based on 'from' or 'to' direction.
func (wr *Wrangler) updateShardRecords(ctx context.Context, keyspace string, shards []*topo.ShardInfo, cells []string, servedType topodatapb.TabletType, isFrom bool, clearSourceShards bool) (err error) {
err = wr.ts.UpdateDisableQueryService(ctx, keyspace, shards, servedType, cells, isFrom /* disable */)
if err != nil {
return err
}

for i, si := range shards {
updatedShard, err := wr.ts.UpdateShardFields(ctx, si.Keyspace(), si.ShardName(), func(si *topo.ShardInfo) error {
if clearSourceShards {
si.SourceShards = nil
}
return nil
})

if err != nil {
return err
}

shards[i] = updatedShard

// For 'to' shards, refresh to make them serve.
// The 'from' shards will be refreshed after traffic has migrated.
if !isFrom {
wr.RefreshTabletsByShard(ctx, si, cells)
}
}
return nil
return topotools.UpdateShardRecords(ctx, wr.ts, wr.tmc, keyspace, shards, cells, servedType, isFrom, clearSourceShards, wr.Logger())
}

// updateFrozenFlag sets or unsets the Frozen flag for master migration. This is performed
Expand Down Expand Up @@ -1384,44 +1359,7 @@ func (wr *Wrangler) SetKeyspaceServedFrom(ctx context.Context, keyspace string,

// RefreshTabletsByShard calls RefreshState on all the tablets in a given shard.
func (wr *Wrangler) RefreshTabletsByShard(ctx context.Context, si *topo.ShardInfo, cells []string) error {
wr.Logger().Infof("RefreshTabletsByShard called on shard %v/%v", si.Keyspace(), si.ShardName())
tabletMap, err := wr.ts.GetTabletMapForShardByCell(ctx, si.Keyspace(), si.ShardName(), cells)
switch {
case err == nil:
// keep going
case topo.IsErrType(err, topo.PartialResult):
wr.Logger().Warningf("RefreshTabletsByShard: got partial result for shard %v/%v, may not refresh all tablets everywhere", si.Keyspace(), si.ShardName())
default:
return err
}

// ignore errors in this phase
wg := sync.WaitGroup{}
for _, ti := range tabletMap {
if ti.Hostname == "" {
// The tablet is not running, we don't have the host
// name to connect to, so we just skip this tablet.
wr.Logger().Infof("Tablet %v has no hostname, skipping its RefreshState", ti.AliasString())
continue
}

wg.Add(1)
go func(ti *topo.TabletInfo) {
wr.Logger().Infof("Calling RefreshState on tablet %v", ti.AliasString())
// Setting an upper bound timeout to fail faster in case of an error.
// Using 60 seconds because RefreshState should not take more than 30 seconds.
// (RefreshState will restart the tablet's QueryService and most time will be spent on the shutdown, i.e. waiting up to 30 seconds on transactions (see Config.TransactionTimeout)).
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
if err := wr.tmc.RefreshState(ctx, ti.Tablet); err != nil {
wr.Logger().Warningf("RefreshTabletsByShard: failed to refresh %v: %v", ti.AliasString(), err)
}
cancel()
wg.Done()
}(ti)
}
wg.Wait()

return nil
return topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, cells, wr.Logger())
}

// DeleteKeyspace will do all the necessary changes in the topology server
Expand Down
20 changes: 2 additions & 18 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1651,27 +1651,11 @@ func (ts *trafficSwitcher) deleteRoutingRules(ctx context.Context) error {
}

func (wr *Wrangler) getRoutingRules(ctx context.Context) (map[string][]string, error) {
rrs, err := wr.ts.GetRoutingRules(ctx)
if err != nil {
return nil, err
}
rules := make(map[string][]string, len(rrs.Rules))
for _, rr := range rrs.Rules {
rules[rr.FromTable] = rr.ToTables
}
return rules, nil
return topotools.GetRoutingRules(ctx, wr.ts)
}

func (wr *Wrangler) saveRoutingRules(ctx context.Context, rules map[string][]string) error {
log.Infof("Saving routing rules %v\n", rules)
rrs := &vschemapb.RoutingRules{Rules: make([]*vschemapb.RoutingRule, 0, len(rules))}
for from, to := range rules {
rrs.Rules = append(rrs.Rules, &vschemapb.RoutingRule{
FromTable: from,
ToTables: to,
})
}
return wr.ts.SaveRoutingRules(ctx, rrs)
return topotools.SaveRoutingRules(ctx, wr.ts, rules)
}

// addParticipatingTablesToKeyspace updates the vschema with the new tables that were created as part of the
Expand Down

0 comments on commit 4932904

Please sign in to comment.