Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplifying topo implementations. #3402

Merged
merged 11 commits into from
Nov 22, 2017
8 changes: 4 additions & 4 deletions go/cmd/topo2topo/topo2topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ func main() {
ctx := context.Background()

if *doKeyspaces {
helpers.CopyKeyspaces(ctx, fromTS.Impl, toTS.Impl)
helpers.CopyKeyspaces(ctx, fromTS, toTS)
}
if *doShards {
helpers.CopyShards(ctx, fromTS.Impl, toTS.Impl)
helpers.CopyShards(ctx, fromTS, toTS)
}
if *doShardReplications {
helpers.CopyShardReplications(ctx, fromTS.Impl, toTS.Impl)
helpers.CopyShardReplications(ctx, fromTS, toTS)
}
if *doTablets {
helpers.CopyTablets(ctx, fromTS.Impl, toTS.Impl)
helpers.CopyTablets(ctx, fromTS, toTS)
}
}
1 change: 0 additions & 1 deletion go/cmd/vtcombo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ func main() {

// vtctld configuration and init
vtctld.InitVtctld(ts)
vtctld.HandleExplorer("memorytopo", vtctld.NewBackendExplorer(ts.Impl))

servenv.OnTerm(func() {
// FIXME(alainjobart): stop vtgate
Expand Down
15 changes: 2 additions & 13 deletions go/cmd/vtctld/plugin_consultopo.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,8 @@ limitations under the License.

package main

// Imports and register the 'consul' topo.Server and its Explorer.
// Imports and register the 'consul' topo.Server.

import (
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/topo/consultopo"
"github.com/youtube/vitess/go/vt/vtctld"
_ "github.com/youtube/vitess/go/vt/topo/consultopo"
)

func init() {
// Wait until flags are parsed, so we can check which topo server is in use.
servenv.OnRun(func() {
if s, ok := ts.Impl.(*consultopo.Server); ok {
vtctld.HandleExplorer("consul", vtctld.NewBackendExplorer(s))
}
})
}
15 changes: 2 additions & 13 deletions go/cmd/vtctld/plugin_etcd2topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,8 @@ limitations under the License.

package main

// Imports and register the 'etcd2' topo.Server and its Explorer.
// Imports and register the 'etcd2' topo.Server.

import (
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/topo/etcd2topo"
"github.com/youtube/vitess/go/vt/vtctld"
_ "github.com/youtube/vitess/go/vt/topo/etcd2topo"
)

func init() {
// Wait until flags are parsed, so we can check which topo server is in use.
servenv.OnRun(func() {
if s, ok := ts.Impl.(*etcd2topo.Server); ok {
vtctld.HandleExplorer("etcd2", vtctld.NewBackendExplorer(s))
}
})
}
15 changes: 2 additions & 13 deletions go/cmd/vtctld/plugin_zk2topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,8 @@ limitations under the License.

package main

// Imports and register the 'zk2' topo.Server and its Explorer.
// Imports and register the 'zk2' topo.Server.

import (
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/topo/zk2topo"
"github.com/youtube/vitess/go/vt/vtctld"
_ "github.com/youtube/vitess/go/vt/topo/zk2topo"
)

func init() {
// Wait until flags are parsed, so we can check which topo server is in use.
servenv.OnRun(func() {
if s, ok := ts.Impl.(*zk2topo.Server); ok {
vtctld.HandleExplorer("zk2", vtctld.NewBackendExplorer(s))
}
})
}
2 changes: 1 addition & 1 deletion go/cmd/vttablet/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ var onStatusRegistered func()
func addStatusParts(qsc tabletserver.Controller) {
servenv.AddStatusPart("Tablet", tabletTemplate, func() interface{} {
return map[string]interface{}{
"Tablet": topo.NewTabletInfo(agent.Tablet(), -1),
"Tablet": topo.NewTabletInfo(agent.Tablet(), nil),
"BlacklistedTables": agent.BlacklistedTables(),
"DisallowQueryService": agent.DisallowQueryService(),
"DisableUpdateStream": !agent.EnableUpdateStream(),
Expand Down
192 changes: 44 additions & 148 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ limitations under the License.
package discovery

import (
"fmt"
"sync"
"testing"
"time"

"github.com/golang/protobuf/proto"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/test/faketopo"
"github.com/youtube/vitess/go/vt/topo/topoproto"
"github.com/youtube/vitess/go/vt/topo/memorytopo"
"golang.org/x/net/context"
)

Expand All @@ -38,16 +36,13 @@ func TestShardReplicationWatcher(t *testing.T) {
}

func checkWatcher(t *testing.T, cellTablets bool) {
ft := newFakeTopo(cellTablets)
ts := topo.Server{Impl: memorytopo.NewServer("aa")}
fhc := NewFakeHealthCheck()
t.Logf(`ft = FakeTopo(); fhc = FakeHealthCheck()`)
var tw *TopologyWatcher
if cellTablets {
tw = NewCellTabletsWatcher(topo.Server{Impl: ft}, fhc, "aa", 10*time.Minute, 5)
t.Logf(`tw = CellTabletsWatcher(topo.Server{ft}, fhc, "aa", 10ms, 5)`)
tw = NewCellTabletsWatcher(ts, fhc, "aa", 10*time.Minute, 5)
} else {
tw = NewShardReplicationWatcher(topo.Server{Impl: ft}, fhc, "aa", "keyspace", "shard", 10*time.Minute, 5)
t.Logf(`tw = ShardReplicationWatcher(topo.Server{ft}, fhc, "aa", "keyspace", "shard", 10ms, 5)`)
tw = NewShardReplicationWatcher(ts, fhc, "aa", "keyspace", "shard", 10*time.Minute, 5)
}

// Wait for the initial topology load to finish. Otherwise we
Expand All @@ -57,164 +52,65 @@ func checkWatcher(t *testing.T, cellTablets bool) {
t.Fatalf("initial WaitForInitialTopology failed")
}

// add a tablet to the topology
ft.AddTablet("aa", 0, "host1", map[string]int32{"vt": 123})
tw.loadTablets()
t.Logf(`ft.AddTablet("aa", 0, "host1", {"vt": 123}); tw.loadTablets()`)
want := &topodatapb.Tablet{
// Add a tablet to the topology.
tablet := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Uid: 0,
Cell: "aa",
Uid: 0,
},
Hostname: "host1",
PortMap: map[string]int32{"vt": 123},
PortMap: map[string]int32{
"vt": 123,
},
Keyspace: "keyspace",
Shard: "shard",
}
if err := ts.CreateTablet(context.Background(), tablet); err != nil {
t.Fatalf("CreateTablet failed: %v", err)
}
tw.loadTablets()

// Check the tablet is returned by GetAllTablets().
allTablets := fhc.GetAllTablets()
key := TabletToMapKey(want)
if _, ok := allTablets[key]; !ok || len(allTablets) != 1 {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, want)
key := TabletToMapKey(tablet)
if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet) {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet)
}

// same tablet, different port, should update (previous
// one should go away, new one be added).
ft.AddTablet("aa", 0, "host1", map[string]int32{"vt": 456})
tw.loadTablets()
t.Logf(`ft.AddTablet("aa", 0, "host1", {"vt": 456}); tw.loadTablets()`)
want = &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Uid: 0,
},
Hostname: "host1",
PortMap: map[string]int32{"vt": 456},
tablet.PortMap["vt"] = 456
if _, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error {
t.PortMap["vt"] = 456
return nil
}); err != nil {
t.Fatalf("UpdateTabletFields failed: %v", err)
}
tw.loadTablets()
allTablets = fhc.GetAllTablets()
key = TabletToMapKey(want)
if _, ok := allTablets[key]; !ok || len(allTablets) != 1 {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, want)
key = TabletToMapKey(tablet)
if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet) {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet)
}

// Remove and re-add with a new uid. This should trigger a ReplaceTablet in loadTablets,
// because the uid does not match.
ft.RemoveTablet("aa", 0)
ft.AddTablet("aa", 1, "host1", map[string]int32{"vt": 456})
tw.loadTablets()
t.Logf(`ft.ReplaceTablet("aa", 0, "host1", {"vt": 456}); tw.loadTablets()`)
want = &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Uid: 1,
},
Hostname: "host1",
PortMap: map[string]int32{"vt": 456},
}
allTablets = fhc.GetAllTablets()
key = TabletToMapKey(want)
if _, ok := allTablets[key]; !ok || len(allTablets) != 1 {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, want)
if err := ts.DeleteTablet(context.Background(), tablet.Alias); err != nil {
t.Fatalf("DeleteTablet failed: %v", err)
}

tw.Stop()
}

type fakeTopo struct {
faketopo.FakeTopo
expectGetTabletsByCell bool

// mu protects the tablets map.
mu sync.RWMutex

// tablets key is topoproto.TabletAliasString(tablet alias).
tablets map[string]*topodatapb.Tablet
}

func newFakeTopo(expectGetTabletsByCell bool) *fakeTopo {
return &fakeTopo{
expectGetTabletsByCell: expectGetTabletsByCell,
tablets: make(map[string]*topodatapb.Tablet),
}
}

func (ft *fakeTopo) AddTablet(cell string, uid uint32, host string, ports map[string]int32) {
ft.mu.Lock()
defer ft.mu.Unlock()
ta := topodatapb.TabletAlias{
Cell: cell,
Uid: uid,
}
tablet := &topodatapb.Tablet{
Alias: &ta,
Hostname: host,
PortMap: make(map[string]int32),
}
for name, port := range ports {
if name == "mysql" {
topoproto.SetMysqlPort(tablet, port)
} else {
tablet.PortMap[name] = port
}
}
ft.tablets[topoproto.TabletAliasString(&ta)] = tablet
}

func (ft *fakeTopo) RemoveTablet(cell string, uid uint32) {
ft.mu.Lock()
defer ft.mu.Unlock()
ta := topodatapb.TabletAlias{
Cell: cell,
Uid: uid,
}
delete(ft.tablets, topoproto.TabletAliasString(&ta))
}

func (ft *fakeTopo) GetTabletsByCell(ctx context.Context, cell string) ([]*topodatapb.TabletAlias, error) {
if !ft.expectGetTabletsByCell {
return nil, fmt.Errorf("unexpected GetTabletsByCell")
}
ft.mu.RLock()
defer ft.mu.RUnlock()
res := make([]*topodatapb.TabletAlias, 0, 1)
for _, tablet := range ft.tablets {
if tablet.Alias.Cell == cell {
res = append(res, tablet.Alias)
}
}
return res, nil
}

// GetShardReplication should return all the nodes in a shard,
// but instead we cheat for this test and just return all the
// tablets in the cell.
func (ft *fakeTopo) GetShardReplication(ctx context.Context, cell, keyspace, shard string) (*topo.ShardReplicationInfo, error) {
if ft.expectGetTabletsByCell {
return nil, fmt.Errorf("unexpected GetShardReplication")
tablet.Alias.Uid = 1
if err := ts.CreateTablet(context.Background(), tablet); err != nil {
t.Fatalf("CreateTablet failed: %v", err)
}
tw.loadTablets()

ft.mu.RLock()
defer ft.mu.RUnlock()
nodes := make([]*topodatapb.ShardReplication_Node, 0, 1)
for _, tablet := range ft.tablets {
if tablet.Alias.Cell == cell {
nodes = append(nodes, &topodatapb.ShardReplication_Node{
TabletAlias: tablet.Alias,
})
}
allTablets = fhc.GetAllTablets()
key = TabletToMapKey(tablet)
if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet) {
t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet)
}
return topo.NewShardReplicationInfo(&topodatapb.ShardReplication{
Nodes: nodes,
}, cell, keyspace, shard), nil
}

func (ft *fakeTopo) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topodatapb.Tablet, int64, error) {
ft.mu.RLock()
defer ft.mu.RUnlock()
// Note we want to be correct here. The way we call this, we never
// change the tablet list in between a call to list them,
// and a call to get the record, so we could just blindly return it.
// (It wasn't the case before we added the WaitForInitialTopology()
// call in the test though!).
tablet, ok := ft.tablets[topoproto.TabletAliasString(alias)]
if !ok {
return nil, 0, topo.ErrNoNode
}
return tablet, 0, nil
tw.Stop()
}

func TestFilterByShard(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/schemamanager/local_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func TestLocalControllerOpen(t *testing.T) {
controller := NewLocalController("")
ctx := context.Background()

if err := controller.Open(ctx); err == nil {
t.Fatalf("Open should fail, no such dir")
if err := controller.Open(ctx); err == nil || !strings.Contains(err.Error(), "no such file or directory") {
t.Fatalf("Open should fail, no such dir, but got: %v", err)
}

schemaChangeDir, err := ioutil.TempDir("", "localcontroller-test")
Expand Down
Loading