diff --git a/go/vt/wrangler/fake_dbclient_test.go b/go/vt/wrangler/fake_dbclient_test.go index 057d7bacd75..7bcc5f5bcf2 100644 --- a/go/vt/wrangler/fake_dbclient_test.go +++ b/go/vt/wrangler/fake_dbclient_test.go @@ -152,10 +152,6 @@ func (dc *fakeDBClient) Rollback() error { func (dc *fakeDBClient) Close() { } -func (dc *fakeDBClient) id() string { - return fmt.Sprintf("FakeDBClient(%s)", dc.name) -} - // ExecuteFetch is part of the DBClient interface func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) { dc.mu.Lock() diff --git a/go/vt/wrangler/keyspace.go b/go/vt/wrangler/keyspace.go index bf01ce9bf24..7f3f00da4f8 100644 --- a/go/vt/wrangler/keyspace.go +++ b/go/vt/wrangler/keyspace.go @@ -20,7 +20,6 @@ import ( "bytes" "context" "fmt" - "strings" "sync" "time" @@ -93,112 +92,6 @@ func (wr *Wrangler) validateNewWorkflow(ctx context.Context, keyspace, workflow return allErrors.AggrError(vterrors.Aggregate) } -func (wr *Wrangler) printShards(ctx context.Context, si []*topo.ShardInfo) error { - for _, si := range si { - wr.Logger().Printf(" Shard: %v\n", si.ShardName()) - if len(si.SourceShards) != 0 { - wr.Logger().Printf(" Source Shards: %v\n", si.SourceShards) - } - ti, err := wr.ts.GetTablet(ctx, si.PrimaryAlias) - if err != nil { - return err - } - qr, err := wr.tmc.VReplicationExec(ctx, ti.Tablet, fmt.Sprintf("select * from _vt.vreplication where db_name=%v", encodeString(ti.DbName()))) - if err != nil { - return err - } - res := sqltypes.Proto3ToResult(qr) - if len(res.Rows) != 0 { - wr.Logger().Printf(" VReplication:\n") - for _, row := range res.Rows { - wr.Logger().Printf(" %v\n", row) - } - } - wr.Logger().Printf(" Is Primary Serving: %v\n", si.IsPrimaryServing) - if len(si.TabletControls) != 0 { - wr.Logger().Printf(" Tablet Controls: %v\n", si.TabletControls) - } - } - return nil -} - -func (wr *Wrangler) getPrimaryPositions(ctx context.Context, shards []*topo.ShardInfo) (map[*topo.ShardInfo]string, error) { - mu := sync.Mutex{} - result := make(map[*topo.ShardInfo]string) - - wg := sync.WaitGroup{} - rec := concurrency.AllErrorRecorder{} - for _, si := range shards { - wg.Add(1) - go func(si *topo.ShardInfo) { - defer wg.Done() - wr.Logger().Infof("Gathering primary position for %v", topoproto.TabletAliasString(si.PrimaryAlias)) - ti, err := wr.ts.GetTablet(ctx, si.PrimaryAlias) - if err != nil { - rec.RecordError(err) - return - } - - pos, err := wr.tmc.PrimaryPosition(ctx, ti.Tablet) - if err != nil { - rec.RecordError(err) - return - } - - wr.Logger().Infof("Got primary position for %v", topoproto.TabletAliasString(si.PrimaryAlias)) - mu.Lock() - result[si] = pos - mu.Unlock() - }(si) - } - wg.Wait() - return result, rec.Error() -} - -func (wr *Wrangler) waitForFilteredReplication(ctx context.Context, sourcePositions map[*topo.ShardInfo]string, destinationShards []*topo.ShardInfo, waitTime time.Duration) error { - wg := sync.WaitGroup{} - rec := concurrency.AllErrorRecorder{} - for _, si := range destinationShards { - wg.Add(1) - go func(si *topo.ShardInfo) { - defer wg.Done() - ctx, cancel := context.WithTimeout(ctx, waitTime) - defer cancel() - - var pos string - for _, sourceShard := range si.SourceShards { - // find the position it should be at - for s, sp := range sourcePositions { - if s.Keyspace() == sourceShard.Keyspace && s.ShardName() == sourceShard.Shard { - pos = sp - break - } - } - - // and wait for it - wr.Logger().Infof("Waiting for %v to catch up", topoproto.TabletAliasString(si.PrimaryAlias)) - ti, err := wr.ts.GetTablet(ctx, si.PrimaryAlias) - if err != nil { - rec.RecordError(err) - return - } - - if err := wr.tmc.VReplicationWaitForPos(ctx, ti.Tablet, sourceShard.Uid, pos); err != nil { - if strings.Contains(err.Error(), "not found") { - wr.Logger().Infof("%v stream %d was not found. Skipping wait.", topoproto.TabletAliasString(si.PrimaryAlias), sourceShard.Uid) - } else { - rec.RecordError(err) - } - } else { - wr.Logger().Infof("%v caught up", topoproto.TabletAliasString(si.PrimaryAlias)) - } - } - }(si) - } - wg.Wait() - return rec.Error() -} - // refreshPrimaryTablets will just RPC-ping all the primary tablets with RefreshState func (wr *Wrangler) refreshPrimaryTablets(ctx context.Context, shards []*topo.ShardInfo) error { wg := sync.WaitGroup{} @@ -230,33 +123,6 @@ func (wr *Wrangler) updateShardRecords(ctx context.Context, keyspace string, sha return topotools.UpdateShardRecords(ctx, wr.ts, wr.tmc, keyspace, shards, cells, servedType, isFrom, clearSourceShards, wr.Logger()) } -// updateFrozenFlag sets or unsets the Frozen flag for primary migration. This is performed -// for all primary tablet control records. -func (wr *Wrangler) updateFrozenFlag(ctx context.Context, shards []*topo.ShardInfo, value bool) (err error) { - for i, si := range shards { - updatedShard, err := wr.ts.UpdateShardFields(ctx, si.Keyspace(), si.ShardName(), func(si *topo.ShardInfo) error { - tc := si.GetTabletControl(topodatapb.TabletType_PRIMARY) - if tc != nil { - tc.Frozen = value - return nil - } - // This shard does not have a tablet control record, adding one to set frozen flag - tc = &topodatapb.Shard_TabletControl{ - TabletType: topodatapb.TabletType_PRIMARY, - Frozen: value, - } - si.TabletControls = append(si.TabletControls, tc) - return nil - }) - if err != nil { - return err - } - - shards[i] = updatedShard - } - return nil -} - func encodeString(in string) string { buf := bytes.NewBuffer(nil) sqltypes.NewVarChar(in).EncodeSQL(buf) diff --git a/go/vt/wrangler/testlib/version_test.go b/go/vt/wrangler/testlib/version_test.go index 634f48e4d7e..102bcdfe6e5 100644 --- a/go/vt/wrangler/testlib/version_test.go +++ b/go/vt/wrangler/testlib/version_test.go @@ -66,10 +66,6 @@ func TestVersion(t *testing.T) { }() discovery.SetTabletPickerRetryDelay(5 * time.Millisecond) - // We need to run this test with the /debug/vars version of the - // plugin. - wrangler.ResetDebugVarsGetVersion() - // Initialize our environment ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/go/vt/wrangler/version.go b/go/vt/wrangler/version.go index be0bd019331..c93b3c5705a 100644 --- a/go/vt/wrangler/version.go +++ b/go/vt/wrangler/version.go @@ -17,10 +17,7 @@ limitations under the License. package wrangler import ( - "encoding/json" "fmt" - "io" - "net/http" "context" @@ -31,42 +28,6 @@ import ( vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) -var getVersionFromTabletDebugVars = func(tabletAddr string) (string, error) { - resp, err := http.Get("http://" + tabletAddr + "/debug/vars") - if err != nil { - return "", err - } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - return "", err - } - - var vars struct { - BuildHost string - BuildUser string - BuildTimestamp int64 - BuildGitRev string - } - err = json.Unmarshal(body, &vars) - if err != nil { - return "", err - } - - version := fmt.Sprintf("%v", vars) - return version, nil -} - -var getVersionFromTablet = getVersionFromTabletDebugVars - -// ResetDebugVarsGetVersion is used by tests to reset the -// getVersionFromTablet variable to the default one. That way we can -// run the unit tests in testlib/ even when another implementation of -// getVersionFromTablet is used. -func ResetDebugVarsGetVersion() { - getVersionFromTablet = getVersionFromTabletDebugVars -} - // GetVersion returns the version string from a tablet func (wr *Wrangler) GetVersion(ctx context.Context, tabletAlias *topodatapb.TabletAlias) (string, error) { resp, err := wr.VtctldServer().GetVersion(ctx, &vtctldatapb.GetVersionRequest{