Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wrangler] cleanup unused functions #13867

Merged
merged 1 commit into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions go/vt/wrangler/fake_dbclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,6 @@ func (dc *fakeDBClient) Rollback() error {
func (dc *fakeDBClient) Close() {
}

func (dc *fakeDBClient) id() string {
return fmt.Sprintf("FakeDBClient(%s)", dc.name)
}

// ExecuteFetch is part of the DBClient interface
func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) {
dc.mu.Lock()
Expand Down
134 changes: 0 additions & 134 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -93,112 +92,6 @@ func (wr *Wrangler) validateNewWorkflow(ctx context.Context, keyspace, workflow
return allErrors.AggrError(vterrors.Aggregate)
}

func (wr *Wrangler) printShards(ctx context.Context, si []*topo.ShardInfo) error {
for _, si := range si {
wr.Logger().Printf(" Shard: %v\n", si.ShardName())
if len(si.SourceShards) != 0 {
wr.Logger().Printf(" Source Shards: %v\n", si.SourceShards)
}
ti, err := wr.ts.GetTablet(ctx, si.PrimaryAlias)
if err != nil {
return err
}
qr, err := wr.tmc.VReplicationExec(ctx, ti.Tablet, fmt.Sprintf("select * from _vt.vreplication where db_name=%v", encodeString(ti.DbName())))
if err != nil {
return err
}
res := sqltypes.Proto3ToResult(qr)
if len(res.Rows) != 0 {
wr.Logger().Printf(" VReplication:\n")
for _, row := range res.Rows {
wr.Logger().Printf(" %v\n", row)
}
}
wr.Logger().Printf(" Is Primary Serving: %v\n", si.IsPrimaryServing)
if len(si.TabletControls) != 0 {
wr.Logger().Printf(" Tablet Controls: %v\n", si.TabletControls)
}
}
return nil
}

func (wr *Wrangler) getPrimaryPositions(ctx context.Context, shards []*topo.ShardInfo) (map[*topo.ShardInfo]string, error) {
mu := sync.Mutex{}
result := make(map[*topo.ShardInfo]string)

wg := sync.WaitGroup{}
rec := concurrency.AllErrorRecorder{}
for _, si := range shards {
wg.Add(1)
go func(si *topo.ShardInfo) {
defer wg.Done()
wr.Logger().Infof("Gathering primary position for %v", topoproto.TabletAliasString(si.PrimaryAlias))
ti, err := wr.ts.GetTablet(ctx, si.PrimaryAlias)
if err != nil {
rec.RecordError(err)
return
}

pos, err := wr.tmc.PrimaryPosition(ctx, ti.Tablet)
if err != nil {
rec.RecordError(err)
return
}

wr.Logger().Infof("Got primary position for %v", topoproto.TabletAliasString(si.PrimaryAlias))
mu.Lock()
result[si] = pos
mu.Unlock()
}(si)
}
wg.Wait()
return result, rec.Error()
}

func (wr *Wrangler) waitForFilteredReplication(ctx context.Context, sourcePositions map[*topo.ShardInfo]string, destinationShards []*topo.ShardInfo, waitTime time.Duration) error {
wg := sync.WaitGroup{}
rec := concurrency.AllErrorRecorder{}
for _, si := range destinationShards {
wg.Add(1)
go func(si *topo.ShardInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, waitTime)
defer cancel()

var pos string
for _, sourceShard := range si.SourceShards {
// find the position it should be at
for s, sp := range sourcePositions {
if s.Keyspace() == sourceShard.Keyspace && s.ShardName() == sourceShard.Shard {
pos = sp
break
}
}

// and wait for it
wr.Logger().Infof("Waiting for %v to catch up", topoproto.TabletAliasString(si.PrimaryAlias))
ti, err := wr.ts.GetTablet(ctx, si.PrimaryAlias)
if err != nil {
rec.RecordError(err)
return
}

if err := wr.tmc.VReplicationWaitForPos(ctx, ti.Tablet, sourceShard.Uid, pos); err != nil {
if strings.Contains(err.Error(), "not found") {
wr.Logger().Infof("%v stream %d was not found. Skipping wait.", topoproto.TabletAliasString(si.PrimaryAlias), sourceShard.Uid)
} else {
rec.RecordError(err)
}
} else {
wr.Logger().Infof("%v caught up", topoproto.TabletAliasString(si.PrimaryAlias))
}
}
}(si)
}
wg.Wait()
return rec.Error()
}

// refreshPrimaryTablets will just RPC-ping all the primary tablets with RefreshState
func (wr *Wrangler) refreshPrimaryTablets(ctx context.Context, shards []*topo.ShardInfo) error {
wg := sync.WaitGroup{}
Expand Down Expand Up @@ -230,33 +123,6 @@ func (wr *Wrangler) updateShardRecords(ctx context.Context, keyspace string, sha
return topotools.UpdateShardRecords(ctx, wr.ts, wr.tmc, keyspace, shards, cells, servedType, isFrom, clearSourceShards, wr.Logger())
}

// updateFrozenFlag sets or unsets the Frozen flag for primary migration. This is performed
// for all primary tablet control records.
func (wr *Wrangler) updateFrozenFlag(ctx context.Context, shards []*topo.ShardInfo, value bool) (err error) {
for i, si := range shards {
updatedShard, err := wr.ts.UpdateShardFields(ctx, si.Keyspace(), si.ShardName(), func(si *topo.ShardInfo) error {
tc := si.GetTabletControl(topodatapb.TabletType_PRIMARY)
if tc != nil {
tc.Frozen = value
return nil
}
// This shard does not have a tablet control record, adding one to set frozen flag
tc = &topodatapb.Shard_TabletControl{
TabletType: topodatapb.TabletType_PRIMARY,
Frozen: value,
}
si.TabletControls = append(si.TabletControls, tc)
return nil
})
if err != nil {
return err
}

shards[i] = updatedShard
}
return nil
}

func encodeString(in string) string {
buf := bytes.NewBuffer(nil)
sqltypes.NewVarChar(in).EncodeSQL(buf)
Expand Down
4 changes: 0 additions & 4 deletions go/vt/wrangler/testlib/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ func TestVersion(t *testing.T) {
}()
discovery.SetTabletPickerRetryDelay(5 * time.Millisecond)

// We need to run this test with the /debug/vars version of the
// plugin.
wrangler.ResetDebugVarsGetVersion()

// Initialize our environment
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
39 changes: 0 additions & 39 deletions go/vt/wrangler/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ limitations under the License.
package wrangler

import (
"encoding/json"
"fmt"
"io"
"net/http"

"context"

Expand All @@ -31,42 +28,6 @@ import (
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

var getVersionFromTabletDebugVars = func(tabletAddr string) (string, error) {
resp, err := http.Get("http://" + tabletAddr + "/debug/vars")
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}

var vars struct {
BuildHost string
BuildUser string
BuildTimestamp int64
BuildGitRev string
}
err = json.Unmarshal(body, &vars)
if err != nil {
return "", err
}

version := fmt.Sprintf("%v", vars)
return version, nil
}

var getVersionFromTablet = getVersionFromTabletDebugVars

// ResetDebugVarsGetVersion is used by tests to reset the
// getVersionFromTablet variable to the default one. That way we can
// run the unit tests in testlib/ even when another implementation of
// getVersionFromTablet is used.
func ResetDebugVarsGetVersion() {
getVersionFromTablet = getVersionFromTabletDebugVars
}

// GetVersion returns the version string from a tablet
func (wr *Wrangler) GetVersion(ctx context.Context, tabletAlias *topodatapb.TabletAlias) (string, error) {
resp, err := wr.VtctldServer().GetVersion(ctx, &vtctldatapb.GetVersionRequest{
Expand Down
Loading