Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wrangler|topotools] Migrate UpdateShardRecords, RefreshTabletsByShard, and {Get,Save}RoutingRules #7965

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions go/vt/topotools/keyspace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package topotools

import (
"context"
"sync"
"time"

"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tmclient"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// RefreshTabletsByShard calls RefreshState on all the tablets in a given shard.
//
// It only returns errors from looking up the tablet map from the topology;
// errors returned from any RefreshState RPCs are logged and then ignored. Also,
// any tablets without a .Hostname set in the topology are skipped.
func RefreshTabletsByShard(ctx context.Context, ts *topo.Server, tmc tmclient.TabletManagerClient, si *topo.ShardInfo, cells []string, logger logutil.Logger) error {
logger.Infof("RefreshTabletsByShard called on shard %v/%v", si.Keyspace(), si.ShardName())

tabletMap, err := ts.GetTabletMapForShardByCell(ctx, si.Keyspace(), si.ShardName(), cells)
switch {
case err == nil:
// keep going
case topo.IsErrType(err, topo.PartialResult):
logger.Warningf("RefreshTabletsByShard: got partial result for shard %v/%v, may not refresh all tablets everywhere", si.Keyspace(), si.ShardName())
default:
return err
}

// Any errors from this point onward are ignored.
var wg sync.WaitGroup
for _, ti := range tabletMap {
if ti.Hostname == "" {
// The tablet is not running, we don't have the host
// name to connect to, so we just skip this tablet.
logger.Infof("Tablet %v has no hostname, skipping its RefreshState", ti.AliasString())
continue
}

wg.Add(1)
go func(ti *topo.TabletInfo) {
defer wg.Done()
logger.Infof("Calling RefreshState on tablet %v", ti.AliasString())

ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

if err := tmc.RefreshState(ctx, ti.Tablet); err != nil {
logger.Warningf("RefreshTabletsByShard: failed to refresh %v: %v", ti.AliasString(), err)
}
}(ti)
}

wg.Wait()
return nil
}

// UpdateShardRecords updates the shard records based on 'from' or 'to'
// direction.
func UpdateShardRecords(
ctx context.Context,
ts *topo.Server,
tmc tmclient.TabletManagerClient,
keyspace string,
shards []*topo.ShardInfo,
cells []string,
servedType topodatapb.TabletType,
isFrom bool,
clearSourceShards bool,
logger logutil.Logger,
) error {
disableQueryService := isFrom
if err := ts.UpdateDisableQueryService(ctx, keyspace, shards, servedType, cells, disableQueryService); err != nil {
return err
}

for i, si := range shards {
updatedShard, err := ts.UpdateShardFields(ctx, si.Keyspace(), si.ShardName(), func(si *topo.ShardInfo) error {
if clearSourceShards {
si.SourceShards = nil
}

return nil
})

if err != nil {
return err
}

shards[i] = updatedShard

// For 'to' shards, refresh to make them serve. The 'from' shards will
// be refreshed after traffic has migrated.
if !isFrom {
_ = RefreshTabletsByShard(ctx, ts, tmc, si, cells, logger)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this pattern of being explicit that we are ignoring the error. However, is not a pattern that I've seen in the codebase.

I wonder if start adding here will create confusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My opinion is being that explicit about ignoring the error is a better (or at least more clear) pattern, and we've got to start somewhere! 😂 I'll do a quick temp check to see what some other folks think about this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like a plan. If we can get thumbs-up that we would like to make this a standard pattern moving forward, let's start here!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consensus seemed to be:

  • yes, explicitly acknowledging that we're ignoring errors is better than implicitly doing so
  • we've had lots of trouble debugging tests specifically due to errors being discarded
  • we should log the ignored error in this specific case, and whether to log or not is probably a case-by-case basis
  • i'm also going to work on adding errcheck to the linter so we can start to enforce this as a pattern going forward

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks for following up on this. All makes sense.

}
}

return nil
}
58 changes: 58 additions & 0 deletions go/vt/topotools/routing_rules.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package topotools

import (
"context"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"

vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)

// GetRoutingRules fetches routing rules from the topology server and returns a
// mapping of fromTable=>[]toTables.
func GetRoutingRules(ctx context.Context, ts *topo.Server) (map[string][]string, error) {
rrs, err := ts.GetRoutingRules(ctx)
if err != nil {
return nil, err
}

rules := make(map[string][]string, len(rrs.Rules))
for _, rr := range rrs.Rules {
rules[rr.FromTable] = rr.ToTables
}

return rules, nil
}

// SaveRoutingRules converts a mapping of fromTable=>[]toTables into a
// vschemapb.RoutingRules protobuf message and saves it in the topology.
func SaveRoutingRules(ctx context.Context, ts *topo.Server, rules map[string][]string) error {
log.Infof("Saving routing rules %v\n", rules)

rrs := &vschemapb.RoutingRules{Rules: make([]*vschemapb.RoutingRule, 0, len(rules))}
for from, to := range rules {
rrs.Rules = append(rrs.Rules, &vschemapb.RoutingRule{
FromTable: from,
ToTables: to,
})
}

return ts.SaveRoutingRules(ctx, rrs)
}
68 changes: 68 additions & 0 deletions go/vt/topotools/routing_rules_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package topotools

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/topo/memorytopo"
)

func TestRoutingRulesRoundTrip(t *testing.T) {
ctx := context.Background()
ts := memorytopo.NewServer("zone1")

rules := map[string][]string{
"t1": {"t2", "t3"},
"t4": {"t5"},
}

err := SaveRoutingRules(ctx, ts, rules)
require.NoError(t, err, "could not save routing rules to topo %v", rules)

roundtripRules, err := GetRoutingRules(ctx, ts)
require.NoError(t, err, "could not fetch routing rules from topo")

assert.Equal(t, rules, roundtripRules)
}

func TestRoutingRulesErrors(t *testing.T) {
ctx := context.Background()
ts, factory := memorytopo.NewServerAndFactory("zone1")
factory.SetError(errors.New("topo failure for testing"))

t.Run("GetRoutingRules error", func(t *testing.T) {

rules, err := GetRoutingRules(ctx, ts)
assert.Error(t, err, "expected error from GetRoutingRules, got rules=%v", rules)
})

t.Run("SaveRoutingRules error", func(t *testing.T) {
rules := map[string][]string{
"t1": {"t2", "t3"},
"t4": {"t5"},
}

err := SaveRoutingRules(ctx, ts, rules)
assert.Error(t, err, "expected error from GetRoutingRules, got rules=%v", rules)
})
}
66 changes: 2 additions & 64 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,32 +918,7 @@ func (wr *Wrangler) startReverseReplication(ctx context.Context, sourceShards []

// updateShardRecords updates the shard records based on 'from' or 'to' direction.
func (wr *Wrangler) updateShardRecords(ctx context.Context, keyspace string, shards []*topo.ShardInfo, cells []string, servedType topodatapb.TabletType, isFrom bool, clearSourceShards bool) (err error) {
err = wr.ts.UpdateDisableQueryService(ctx, keyspace, shards, servedType, cells, isFrom /* disable */)
if err != nil {
return err
}

for i, si := range shards {
updatedShard, err := wr.ts.UpdateShardFields(ctx, si.Keyspace(), si.ShardName(), func(si *topo.ShardInfo) error {
if clearSourceShards {
si.SourceShards = nil
}
return nil
})

if err != nil {
return err
}

shards[i] = updatedShard

// For 'to' shards, refresh to make them serve.
// The 'from' shards will be refreshed after traffic has migrated.
if !isFrom {
wr.RefreshTabletsByShard(ctx, si, cells)
}
}
return nil
return topotools.UpdateShardRecords(ctx, wr.ts, wr.tmc, keyspace, shards, cells, servedType, isFrom, clearSourceShards, wr.Logger())
}

// updateFrozenFlag sets or unsets the Frozen flag for master migration. This is performed
Expand Down Expand Up @@ -1384,44 +1359,7 @@ func (wr *Wrangler) SetKeyspaceServedFrom(ctx context.Context, keyspace string,

// RefreshTabletsByShard calls RefreshState on all the tablets in a given shard.
func (wr *Wrangler) RefreshTabletsByShard(ctx context.Context, si *topo.ShardInfo, cells []string) error {
wr.Logger().Infof("RefreshTabletsByShard called on shard %v/%v", si.Keyspace(), si.ShardName())
tabletMap, err := wr.ts.GetTabletMapForShardByCell(ctx, si.Keyspace(), si.ShardName(), cells)
switch {
case err == nil:
// keep going
case topo.IsErrType(err, topo.PartialResult):
wr.Logger().Warningf("RefreshTabletsByShard: got partial result for shard %v/%v, may not refresh all tablets everywhere", si.Keyspace(), si.ShardName())
default:
return err
}

// ignore errors in this phase
wg := sync.WaitGroup{}
for _, ti := range tabletMap {
if ti.Hostname == "" {
// The tablet is not running, we don't have the host
// name to connect to, so we just skip this tablet.
wr.Logger().Infof("Tablet %v has no hostname, skipping its RefreshState", ti.AliasString())
continue
}

wg.Add(1)
go func(ti *topo.TabletInfo) {
wr.Logger().Infof("Calling RefreshState on tablet %v", ti.AliasString())
// Setting an upper bound timeout to fail faster in case of an error.
// Using 60 seconds because RefreshState should not take more than 30 seconds.
// (RefreshState will restart the tablet's QueryService and most time will be spent on the shutdown, i.e. waiting up to 30 seconds on transactions (see Config.TransactionTimeout)).
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
if err := wr.tmc.RefreshState(ctx, ti.Tablet); err != nil {
wr.Logger().Warningf("RefreshTabletsByShard: failed to refresh %v: %v", ti.AliasString(), err)
}
cancel()
wg.Done()
}(ti)
}
wg.Wait()

return nil
return topotools.RefreshTabletsByShard(ctx, wr.ts, wr.tmc, si, cells, wr.Logger())
}

// DeleteKeyspace will do all the necessary changes in the topology server
Expand Down
20 changes: 2 additions & 18 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1788,27 +1788,11 @@ func (ts *trafficSwitcher) deleteRoutingRules(ctx context.Context) error {
}

func (wr *Wrangler) getRoutingRules(ctx context.Context) (map[string][]string, error) {
rrs, err := wr.ts.GetRoutingRules(ctx)
if err != nil {
return nil, err
}
rules := make(map[string][]string, len(rrs.Rules))
for _, rr := range rrs.Rules {
rules[rr.FromTable] = rr.ToTables
}
return rules, nil
return topotools.GetRoutingRules(ctx, wr.ts)
}

func (wr *Wrangler) saveRoutingRules(ctx context.Context, rules map[string][]string) error {
log.Infof("Saving routing rules %v\n", rules)
rrs := &vschemapb.RoutingRules{Rules: make([]*vschemapb.RoutingRule, 0, len(rules))}
for from, to := range rules {
rrs.Rules = append(rrs.Rules, &vschemapb.RoutingRule{
FromTable: from,
ToTables: to,
})
}
return wr.ts.SaveRoutingRules(ctx, rrs)
return topotools.SaveRoutingRules(ctx, wr.ts, rules)
}

func reverseName(workflow string) string {
Expand Down