Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refresh replicas and rdonly after MigrateServedTypes except on skipRefreshState. #7327

Merged
merged 1 commit into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ func commandRefreshStateByShard(ctx context.Context, wr *wrangler.Wrangler, subF
if *cellsStr != "" {
cells = strings.Split(*cellsStr, ",")
}
return wr.RefreshTabletsByShard(ctx, si, nil /* tabletTypes */, cells)
return wr.RefreshTabletsByShard(ctx, si, cells)
}

func commandRunHealthCheck(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
Expand Down
25 changes: 14 additions & 11 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (wr *Wrangler) cancelHorizontalResharding(ctx context.Context, keyspace, sh

destinationShards[i] = updatedShard

if err := wr.RefreshTabletsByShard(ctx, si, nil, nil); err != nil {
if err := wr.RefreshTabletsByShard(ctx, si, nil); err != nil {
return err
}
}
Expand Down Expand Up @@ -450,6 +450,8 @@ func (wr *Wrangler) MigrateServedTypes(ctx context.Context, keyspace, shard stri
// refresh
// TODO(b/26388813): Integrate vtctl WaitForDrain here instead of just sleeping.
// Anything that's not a replica will use the RDONLY sleep time.
// Master Migrate performs its own refresh but we will refresh all non master
// tablets after each migration
waitForDrainSleep := *waitForDrainSleepRdonly
if servedType == topodatapb.TabletType_REPLICA {
waitForDrainSleep = *waitForDrainSleepReplica
Expand All @@ -465,7 +467,7 @@ func (wr *Wrangler) MigrateServedTypes(ctx context.Context, keyspace, shard stri
refreshShards = destinationShards
rafael marked this conversation as resolved.
Show resolved Hide resolved
}
for _, si := range refreshShards {
rec.RecordError(wr.RefreshTabletsByShard(ctx, si, []topodatapb.TabletType{servedType}, cells))
rec.RecordError(wr.RefreshTabletsByShard(ctx, si, cells))
}
return rec.Error()
}
Expand Down Expand Up @@ -813,6 +815,12 @@ func (wr *Wrangler) masterMigrateServedType(ctx context.Context, keyspace string
}
}

for _, si := range destinationShards {
if err := wr.RefreshTabletsByShard(ctx, si, nil); err != nil {
return err
}
}

event.DispatchUpdate(ev, "finished")
return nil
}
Expand Down Expand Up @@ -932,7 +940,7 @@ func (wr *Wrangler) updateShardRecords(ctx context.Context, keyspace string, sha
// For 'to' shards, refresh to make them serve.
// The 'from' shards will be refreshed after traffic has migrated.
if !isFrom {
wr.RefreshTabletsByShard(ctx, si, []topodatapb.TabletType{servedType}, cells)
wr.RefreshTabletsByShard(ctx, si, cells)
}
}
return nil
Expand Down Expand Up @@ -1268,7 +1276,7 @@ func (wr *Wrangler) replicaMigrateServedFrom(ctx context.Context, ki *topo.Keysp
// Now refresh the source servers so they reload their
// blacklisted table list
event.DispatchUpdate(ev, "refreshing sources tablets state so they update their blacklisted tables")
return wr.RefreshTabletsByShard(ctx, sourceShard, []topodatapb.TabletType{servedType}, cells)
return wr.RefreshTabletsByShard(ctx, sourceShard, cells)
}

// masterMigrateServedFrom handles the master migration. The ordering is
Expand Down Expand Up @@ -1374,10 +1382,8 @@ func (wr *Wrangler) SetKeyspaceServedFrom(ctx context.Context, keyspace string,
return wr.ts.UpdateKeyspace(ctx, ki)
}

// RefreshTabletsByShard calls RefreshState on all the tables of a
// given type in a shard. It would work for the master, but the
// discovery wouldn't be very efficient.
func (wr *Wrangler) RefreshTabletsByShard(ctx context.Context, si *topo.ShardInfo, tabletTypes []topodatapb.TabletType, cells []string) error {
// RefreshTabletsByShard calls RefreshState on all the tablets in a given shard.
func (wr *Wrangler) RefreshTabletsByShard(ctx context.Context, si *topo.ShardInfo, cells []string) error {
wr.Logger().Infof("RefreshTabletsByShard called on shard %v/%v", si.Keyspace(), si.ShardName())
tabletMap, err := wr.ts.GetTabletMapForShardByCell(ctx, si.Keyspace(), si.ShardName(), cells)
switch {
Expand All @@ -1392,9 +1398,6 @@ func (wr *Wrangler) RefreshTabletsByShard(ctx context.Context, si *topo.ShardInf
// ignore errors in this phase
wg := sync.WaitGroup{}
for _, ti := range tabletMap {
if tabletTypes != nil && !topoproto.IsTypeInList(ti.Type, tabletTypes) {
continue
}
if ti.Hostname == "" {
// The tablet is not running, we don't have the host
// name to connect to, so we just skip this tablet.
Expand Down
6 changes: 3 additions & 3 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a
}); err != nil {
return err
}
return ts.wr.RefreshTabletsByShard(ctx, source.si, nil, nil)
return ts.wr.RefreshTabletsByShard(ctx, source.si, nil)
})
}

Expand Down Expand Up @@ -1364,7 +1364,7 @@ func (ts *trafficSwitcher) allowTableTargetWrites(ctx context.Context) error {
}); err != nil {
return err
}
return ts.wr.RefreshTabletsByShard(ctx, target.si, nil, nil)
return ts.wr.RefreshTabletsByShard(ctx, target.si, nil)
})
}

Expand Down Expand Up @@ -1522,7 +1522,7 @@ func (ts *trafficSwitcher) dropSourceBlacklistedTables(ctx context.Context) erro
}); err != nil {
return err
}
return ts.wr.RefreshTabletsByShard(ctx, source.si, nil, nil)
return ts.wr.RefreshTabletsByShard(ctx, source.si, nil)
})
}

Expand Down