From 97a5a80f130bbbd0cb7a1baade7daa10bccb7f19 Mon Sep 17 00:00:00 2001 From: Alain Jobart Date: Fri, 19 Jul 2013 15:27:52 -0700 Subject: [PATCH 1/4] Skeleton for a 'Tee' topo.Server for migration purposes. --- Makefile | 2 +- go/cmd/topo2topo/topo2topo.go | 143 +--------- go/vt/topo/{topology_server.go => server.go} | 0 go/vt/topotools/copy.go | 147 ++++++++++ .../topotools/copy_test.go} | 23 +- go/vt/topotools/tee.go | 253 ++++++++++++++++++ go/vt/topotools/tee_test.go | 64 +++++ .../topotools}/test_zk_client.json | 0 8 files changed, 484 insertions(+), 148 deletions(-) rename go/vt/topo/{topology_server.go => server.go} (100%) create mode 100644 go/vt/topotools/copy.go rename go/{cmd/topo2topo/topo2topo_test.go => vt/topotools/copy_test.go} (90%) create mode 100644 go/vt/topotools/tee.go create mode 100644 go/vt/topotools/tee_test.go rename go/{cmd/topo2topo => vt/topotools}/test_zk_client.json (100%) diff --git a/Makefile b/Makefile index c7f95a05980..ab1ad501af4 100644 --- a/Makefile +++ b/Makefile @@ -30,7 +30,6 @@ unit_test: cd go/bytes2; go test cd go/cache; go test cd go/cgzip; go test - cd go/cmd/topo2topo; go test cd go/cmd/zkns2pdns; go test cd go/hack; go test # cd go/logfile; go test @@ -49,6 +48,7 @@ unit_test: cd go/vt/mysqlctl; go test cd go/vt/sqlparser; go test cd go/vt/tabletserver; go test + cd go/vt/topotools; go test cd go/zk; go test cd go/zk/fakezk; go test cd go/zk/zkctl; go test diff --git a/go/cmd/topo2topo/topo2topo.go b/go/cmd/topo2topo/topo2topo.go index 133a6309993..ebfc8917670 100644 --- a/go/cmd/topo2topo/topo2topo.go +++ b/go/cmd/topo2topo/topo2topo.go @@ -7,12 +7,11 @@ package main import ( "flag" "os" - "sync" "github.com/youtube/vitess/go/relog" "github.com/youtube/vitess/go/tb" - "github.com/youtube/vitess/go/vt/concurrency" "github.com/youtube/vitess/go/vt/topo" + "github.com/youtube/vitess/go/vt/topotools" ) var fromTopo = flag.String("from", "", "topology to copy data from") @@ -26,140 +25,6 @@ var deleteKeyspaceShards = flag.Bool("delete-keyspace-shards", false, "when copy var logLevel = flag.String("log.level", "INFO", "set log level") -// copyKeyspaces will create the keyspaces in the destination topo -func copyKeyspaces(fromTS, toTS topo.Server) { - keyspaces, err := fromTS.GetKeyspaces() - if err != nil { - relog.Fatal("fromTS.GetKeyspaces failed: %v", err) - } - - wg := sync.WaitGroup{} - er := concurrency.AllErrorRecorder{} - for _, keyspace := range keyspaces { - wg.Add(1) - go func(keyspace string) { - defer wg.Done() - if err := toTS.CreateKeyspace(keyspace); err != nil { - if err == topo.ErrNodeExists { - relog.Warning("Keyspace %v already exists", keyspace) - } else { - er.RecordError(err) - } - } - }(keyspace) - } - wg.Wait() - if er.HasErrors() { - relog.Fatal("copyKeyspaces failed: %v", err) - } -} - -// copyShards will create the keyspaces in the destination topo -func copyShards(fromTS, toTS topo.Server, deleteKeyspaceShards bool) { - keyspaces, err := fromTS.GetKeyspaces() - if err != nil { - relog.Fatal("fromTS.GetKeyspaces failed: %v", err) - } - - wg := sync.WaitGroup{} - er := concurrency.AllErrorRecorder{} - for _, keyspace := range keyspaces { - wg.Add(1) - go func(keyspace string) { - defer wg.Done() - shards, err := fromTS.GetShardNames(keyspace) - if err != nil { - er.RecordError(err) - return - } - - if deleteKeyspaceShards { - if err := toTS.DeleteKeyspaceShards(keyspace); err != nil { - er.RecordError(err) - return - } - } - - for _, shard := range shards { - wg.Add(1) - go func(keyspace, shard string) { - defer wg.Done() - if err := toTS.CreateShard(keyspace, shard); err != nil { - if err == topo.ErrNodeExists { - relog.Warning("Shard %v/%v already exists", keyspace, shard) - } else { - er.RecordError(err) - } - } - }(keyspace, shard) - } - }(keyspace) - } - wg.Wait() - if er.HasErrors() { - relog.Fatal("copyShards failed: %v", err) - } -} - -// copyTablets will create the tablets in the destination topo -func copyTablets(fromTS, toTS topo.Server) { - cells, err := fromTS.GetKnownCells() - if err != nil { - relog.Fatal("fromTS.GetKnownCells failed: %v", err) - } - - wg := sync.WaitGroup{} - er := concurrency.AllErrorRecorder{} - for _, cell := range cells { - wg.Add(1) - go func(cell string) { - defer wg.Done() - tabletAliases, err := fromTS.GetTabletsByCell(cell) - if err != nil { - er.RecordError(err) - } else { - for _, tabletAlias := range tabletAliases { - wg.Add(1) - go func(tabletAlias topo.TabletAlias) { - defer wg.Done() - - // read the source tablet - ti, err := fromTS.GetTablet(tabletAlias) - if err != nil { - er.RecordError(err) - return - } - - // try to create the destination - err = toTS.CreateTablet(ti.Tablet) - if err == topo.ErrNodeExists { - // update the destination tablet - _, err = toTS.UpdateTablet(ti, -1) - - } - if err != nil { - er.RecordError(err) - return - } - - // create the replication paths - // for masters only here - if ti.Type == topo.TYPE_MASTER { - if err = toTS.CreateReplicationPath(ti.Keyspace, ti.Shard, ti.Alias().String()); err != nil && err != topo.ErrNodeExists { - er.RecordError(err) - } - } - }(tabletAlias) - } - } - }(cell) - } - wg.Wait() - if er.HasErrors() { - relog.Fatal("copyTablets failed: %v", er.Error()) - } -} - func main() { defer func() { if panicErr := recover(); panicErr != nil { @@ -188,12 +53,12 @@ func main() { toTS := topo.GetServerByName(*toTopo) if *doKeyspaces { - copyKeyspaces(fromTS, toTS) + topotools.CopyKeyspaces(fromTS, toTS) } if *doShards { - copyShards(fromTS, toTS, *deleteKeyspaceShards) + topotools.CopyShards(fromTS, toTS, *deleteKeyspaceShards) } if *doTablets { - copyTablets(fromTS, toTS) + topotools.CopyTablets(fromTS, toTS) } } diff --git a/go/vt/topo/topology_server.go b/go/vt/topo/server.go similarity index 100% rename from go/vt/topo/topology_server.go rename to go/vt/topo/server.go diff --git a/go/vt/topotools/copy.go b/go/vt/topotools/copy.go new file mode 100644 index 00000000000..59ea14e6b5d --- /dev/null +++ b/go/vt/topotools/copy.go @@ -0,0 +1,147 @@ +// Copyright 2013, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package topotools + +import ( + "sync" + + "github.com/youtube/vitess/go/relog" + "github.com/youtube/vitess/go/vt/concurrency" + "github.com/youtube/vitess/go/vt/topo" +) + +// CopyKeyspaces will create the keyspaces in the destination topo +func CopyKeyspaces(fromTS, toTS topo.Server) { + keyspaces, err := fromTS.GetKeyspaces() + if err != nil { + relog.Fatal("fromTS.GetKeyspaces failed: %v", err) + } + + wg := sync.WaitGroup{} + er := concurrency.AllErrorRecorder{} + for _, keyspace := range keyspaces { + wg.Add(1) + go func(keyspace string) { + defer wg.Done() + if err := toTS.CreateKeyspace(keyspace); err != nil { + if err == topo.ErrNodeExists { + relog.Warning("Keyspace %v already exists", keyspace) + } else { + er.RecordError(err) + } + } + }(keyspace) + } + wg.Wait() + if er.HasErrors() { + relog.Fatal("copyKeyspaces failed: %v", err) + } +} + +// CopyShards will create the keyspaces in the destination topo +func CopyShards(fromTS, toTS topo.Server, deleteKeyspaceShards bool) { + keyspaces, err := fromTS.GetKeyspaces() + if err != nil { + relog.Fatal("fromTS.GetKeyspaces failed: %v", err) + } + + wg := sync.WaitGroup{} + er := concurrency.AllErrorRecorder{} + for _, keyspace := range keyspaces { + wg.Add(1) + go func(keyspace string) { + defer wg.Done() + shards, err := fromTS.GetShardNames(keyspace) + if err != nil { + er.RecordError(err) + return + } + + if deleteKeyspaceShards { + if err := toTS.DeleteKeyspaceShards(keyspace); err != nil { + er.RecordError(err) + return + } + } + + for _, shard := range shards { + wg.Add(1) + go func(keyspace, shard string) { + defer wg.Done() + if err := toTS.CreateShard(keyspace, shard); err != nil { + if err == topo.ErrNodeExists { + relog.Warning("Shard %v/%v already exists", keyspace, shard) + } else { + er.RecordError(err) + } + } + }(keyspace, shard) + } + }(keyspace) + } + wg.Wait() + if er.HasErrors() { + relog.Fatal("copyShards failed: %v", err) + } +} + +// CopyTablets will create the tablets in the destination topo +func CopyTablets(fromTS, toTS topo.Server) { + cells, err := fromTS.GetKnownCells() + if err != nil { + relog.Fatal("fromTS.GetKnownCells failed: %v", err) + } + + wg := sync.WaitGroup{} + er := concurrency.AllErrorRecorder{} + for _, cell := range cells { + wg.Add(1) + go func(cell string) { + defer wg.Done() + tabletAliases, err := fromTS.GetTabletsByCell(cell) + if err != nil { + er.RecordError(err) + } else { + for _, tabletAlias := range tabletAliases { + wg.Add(1) + go func(tabletAlias topo.TabletAlias) { + defer wg.Done() + + // read the source tablet + ti, err := fromTS.GetTablet(tabletAlias) + if err != nil { + er.RecordError(err) + return + } + + // try to create the destination + err = toTS.CreateTablet(ti.Tablet) + if err == topo.ErrNodeExists { + // update the destination tablet + _, err = toTS.UpdateTablet(ti, -1) + + } + if err != nil { + er.RecordError(err) + return + } + + // create the replication paths + // for masters only here + if ti.Type == topo.TYPE_MASTER { + if err = toTS.CreateReplicationPath(ti.Keyspace, ti.Shard, ti.Alias().String()); err != nil && err != topo.ErrNodeExists { + er.RecordError(err) + } + } + }(tabletAlias) + } + } + }(cell) + } + wg.Wait() + if er.HasErrors() { + relog.Fatal("copyTablets failed: %v", er.Error()) + } +} diff --git a/go/cmd/topo2topo/topo2topo_test.go b/go/vt/topotools/copy_test.go similarity index 90% rename from go/cmd/topo2topo/topo2topo_test.go rename to go/vt/topotools/copy_test.go index 0687f05b78b..2ad2a627ca2 100644 --- a/go/cmd/topo2topo/topo2topo_test.go +++ b/go/vt/topotools/copy_test.go @@ -2,7 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -package main +package topotools import ( "os" @@ -17,7 +17,7 @@ import ( "launchpad.net/gozk/zookeeper" ) -func TestBasic(t *testing.T) { +func createSetup(t *testing.T) (topo.Server, topo.Server) { fromConn := fakezk.NewConn() fromTS := zktopo.NewServer(fromConn) @@ -79,8 +79,15 @@ func TestBasic(t *testing.T) { } relog.Info("Cells: %v", cells) + return fromTS, toTS +} + +func TestBasic(t *testing.T) { + + fromTS, toTS := createSetup(t) + // check keyspace copy - copyKeyspaces(fromTS, toTS) + CopyKeyspaces(fromTS, toTS) keyspaces, err := toTS.GetKeyspaces() if err != nil { t.Fatalf("toTS.GetKeyspaces failed: %v", err) @@ -88,10 +95,10 @@ func TestBasic(t *testing.T) { if len(keyspaces) != 1 || keyspaces[0] != "test_keyspace" { t.Fatalf("unexpected keyspaces: %v", keyspaces) } - copyKeyspaces(fromTS, toTS) + CopyKeyspaces(fromTS, toTS) // check shard copy - copyShards(fromTS, toTS, true) + CopyShards(fromTS, toTS, true) shards, err := toTS.GetShardNames("test_keyspace") if err != nil { t.Fatalf("toTS.GetShardNames failed: %v", err) @@ -99,10 +106,10 @@ func TestBasic(t *testing.T) { if len(shards) != 1 || shards[0] != "0" { t.Fatalf("unexpected shards: %v", shards) } - copyShards(fromTS, toTS, false) + CopyShards(fromTS, toTS, false) // check tablet copy - copyTablets(fromTS, toTS) + CopyTablets(fromTS, toTS) tablets, err := toTS.GetTabletsByCell("test_cell") if err != nil { t.Fatalf("toTS.GetTabletsByCell failed: %v", err) @@ -110,5 +117,5 @@ func TestBasic(t *testing.T) { if len(tablets) != 2 || tablets[0].Uid != 123 || tablets[1].Uid != 234 { t.Fatalf("unexpected tablets: %v", tablets) } - copyTablets(fromTS, toTS) + CopyTablets(fromTS, toTS) } diff --git a/go/vt/topotools/tee.go b/go/vt/topotools/tee.go new file mode 100644 index 00000000000..45be1781ba6 --- /dev/null +++ b/go/vt/topotools/tee.go @@ -0,0 +1,253 @@ +// Copyright 2013, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package topotools + +import ( + "time" + + "github.com/youtube/vitess/go/relog" + "github.com/youtube/vitess/go/vt/topo" +) + +// Tee is an implementation of topo.Server that uses a primary +// underlying topo.Server for all changes, but also duplicates the +// changes to a secondary topo.Server. It also locks both topo servers +// when needed. It is meant to be used during transitions from one +// topo.Server to another. +type Tee struct { + primary topo.Server + secondary topo.Server +} + +func NewTee(primary, secondary topo.Server) *Tee { + return &Tee{primary, secondary} +} + +// topo.Server management interface. + +func (tee *Tee) Close() { + tee.primary.Close() + tee.secondary.Close() +} + +// Cell management, global + +func (tee *Tee) GetKnownCells() ([]string, error) { + return tee.primary.GetKnownCells() +} + +// Keyspace management, global. + +func (tee *Tee) CreateKeyspace(keyspace string) error { + if err := tee.primary.CreateKeyspace(keyspace); err != nil { + return err + } + + // this is critical enough we want to fail + if err := tee.secondary.CreateKeyspace(keyspace); err != nil { + return err + } + return nil +} + +func (tee *Tee) GetKeyspaces() ([]string, error) { + return tee.primary.GetKeyspaces() +} + +func (tee *Tee) DeleteKeyspaceShards(keyspace string) error { + if err := tee.primary.DeleteKeyspaceShards(keyspace); err != nil { + return err + } + + if err := tee.secondary.DeleteKeyspaceShards(keyspace); err != nil { + // not critical enough to fail + relog.Warning("secondary.DeleteKeyspaceShards(%v) failed: %v", keyspace, err) + } + return nil +} + +// Shard management, global. + +func (tee *Tee) CreateShard(keyspace, shard string) error { + err := tee.primary.CreateShard(keyspace, shard) + if err != nil && err != topo.ErrNodeExists { + return err + } + + serr := tee.secondary.CreateShard(keyspace, shard) + if serr != nil && err != topo.ErrNodeExists { + // not critical enough to fail + relog.Warning("secondary.CreateShard(%v,%v) failed: %v", keyspace, shard, err) + } + return err +} + +func (tee *Tee) UpdateShard(si *topo.ShardInfo) error { + return tee.primary.UpdateShard(si) +} + +func (tee *Tee) ValidateShard(keyspace, shard string) error { + return tee.primary.ValidateShard(keyspace, shard) +} + +func (tee *Tee) GetShard(keyspace, shard string) (si *topo.ShardInfo, err error) { + return tee.primary.GetShard(keyspace, shard) +} + +func (tee *Tee) GetShardNames(keyspace string) ([]string, error) { + return tee.primary.GetShardNames(keyspace ) +} + +// Tablet management, per cell. + +func (tee *Tee) CreateTablet(tablet *topo.Tablet) error { + return tee.primary.CreateTablet(tablet) +} + +func (tee *Tee) UpdateTablet(tablet *topo.TabletInfo, existingVersion int) (newVersion int, err error) { + return tee.primary.UpdateTablet(tablet, existingVersion) +} + +func (tee *Tee) UpdateTabletFields(tabletAlias topo.TabletAlias, update func(*topo.Tablet) error) error { + return tee.primary.UpdateTabletFields(tabletAlias, update) +} + +func (tee *Tee) DeleteTablet(alias topo.TabletAlias) error { + return tee.primary.DeleteTablet(alias) +} + +func (tee *Tee) ValidateTablet(alias topo.TabletAlias) error { + return tee.primary.ValidateTablet(alias) +} + +func (tee *Tee) GetTablet(alias topo.TabletAlias) (*topo.TabletInfo, error) { + return tee.primary.GetTablet(alias) +} + +func (tee *Tee) GetTabletsByCell(cell string) ([]topo.TabletAlias, error) { + return tee.primary.GetTabletsByCell(cell) +} + +// Replication graph management, global. + +func (tee *Tee) GetReplicationPaths(keyspace, shard, repPath string) ([]topo.TabletAlias, error) { + return tee.primary.GetReplicationPaths(keyspace, shard, repPath) +} + +func (tee *Tee) CreateReplicationPath(keyspace, shard, repPath string) error { + return tee.primary.CreateReplicationPath(keyspace, shard, repPath) +} + +func (tee *Tee) DeleteReplicationPath(keyspace, shard, repPath string) error { + return tee.primary.DeleteReplicationPath(keyspace, shard, repPath) +} + +// Serving Graph management, per cell. + +func (tee *Tee) GetSrvTabletTypesPerShard(cell, keyspace, shard string) ([]topo.TabletType, error) { + return tee.primary.GetSrvTabletTypesPerShard(cell, keyspace, shard) +} + +func (tee *Tee) UpdateSrvTabletType(cell, keyspace, shard string, tabletType topo.TabletType, addrs *topo.VtnsAddrs) error { + return tee.primary.UpdateSrvTabletType(cell, keyspace, shard, tabletType, addrs) +} + +func (tee *Tee) GetSrvTabletType(cell, keyspace, shard string, tabletType topo.TabletType) (*topo.VtnsAddrs, error) { + return tee.primary.GetSrvTabletType(cell, keyspace, shard, tabletType) +} + +func (tee *Tee) DeleteSrvTabletType(cell, keyspace, shard string, tabletType topo.TabletType) error { + return tee.primary.DeleteSrvTabletType(cell, keyspace, shard, tabletType) +} + +func (tee *Tee) UpdateSrvShard(cell, keyspace, shard string, srvShard *topo.SrvShard) error { + return tee.primary.UpdateSrvShard(cell, keyspace, shard, srvShard) +} + +func (tee *Tee) GetSrvShard(cell, keyspace, shard string) (*topo.SrvShard, error) { + return tee.primary.GetSrvShard(cell, keyspace, shard) +} + +func (tee *Tee) UpdateSrvKeyspace(cell, keyspace string, srvKeyspace *topo.SrvKeyspace) error { + return tee.primary.UpdateSrvKeyspace(cell, keyspace, srvKeyspace) +} + +func (tee *Tee) GetSrvKeyspace(cell, keyspace string) (*topo.SrvKeyspace, error) { + return tee.primary.GetSrvKeyspace(cell, keyspace) +} + +func (tee *Tee) UpdateTabletEndpoint(cell, keyspace, shard string, tabletType topo.TabletType, addr *topo.VtnsAddr) error { + return tee.primary.UpdateTabletEndpoint(cell, keyspace, shard, tabletType, addr) +} + +// Keyspace and Shard locks for actions, global. + +func (tee *Tee) LockKeyspaceForAction(keyspace, contents string, timeout time.Duration, interrupted chan struct{}) (string, error) { + return tee.primary.LockKeyspaceForAction(keyspace, contents, timeout, interrupted) +} + +func (tee *Tee) UnlockKeyspaceForAction(keyspace, lockPath, results string) error { + return tee.primary.UnlockKeyspaceForAction(keyspace, lockPath, results) +} + +func (tee *Tee) LockShardForAction(keyspace, shard, contents string, timeout time.Duration, interrupted chan struct{}) (string, error) { + return tee.primary.LockShardForAction(keyspace, shard, contents, timeout, interrupted) +} + +func (tee *Tee) UnlockShardForAction(keyspace, shard, lockPath, results string) error { + return tee.primary.UnlockShardForAction(keyspace, shard, lockPath, results) +} + +// Remote Tablet Actions, local cell. + +func (tee *Tee) WriteTabletAction(tabletAlias topo.TabletAlias, contents string) (string, error) { + return tee.primary.WriteTabletAction(tabletAlias, contents) +} + +func (tee *Tee) WaitForTabletAction(actionPath string, waitTime time.Duration, interrupted chan struct{}) (string, error) { + return tee.primary.WaitForTabletAction(actionPath, waitTime, interrupted) +} + +func (tee *Tee) PurgeTabletActions(tabletAlias topo.TabletAlias, canBePurged func(data string) bool) error { + return tee.primary.PurgeTabletActions(tabletAlias, canBePurged) +} + +// Supporting the local agent process, local cell. + +func (tee *Tee) ValidateTabletActions(tabletAlias topo.TabletAlias) error { + return tee.primary.ValidateTabletActions(tabletAlias) +} + +func (tee *Tee) CreateTabletPidNode(tabletAlias topo.TabletAlias, done chan struct{}) error { + return tee.primary.CreateTabletPidNode(tabletAlias, done) +} + +func (tee *Tee) ValidateTabletPidNode(tabletAlias topo.TabletAlias) error { + return tee.primary.ValidateTabletPidNode(tabletAlias) +} + +func (tee *Tee) GetSubprocessFlags() []string { + return tee.primary.GetSubprocessFlags() +} + +func (tee *Tee) ActionEventLoop(tabletAlias topo.TabletAlias, dispatchAction func(actionPath, data string) error, done chan struct{}) { + tee.primary.ActionEventLoop(tabletAlias, dispatchAction, done) +} + +func (tee *Tee) ReadTabletActionPath(actionPath string) (topo.TabletAlias, string, int, error) { + return tee.primary.ReadTabletActionPath(actionPath) +} + +func (tee *Tee) UpdateTabletAction(actionPath, data string, version int) error { + return tee.primary.UpdateTabletAction(actionPath, data, version) +} + +func (tee *Tee) StoreTabletActionResponse(actionPath, data string) error { + return tee.primary.StoreTabletActionResponse(actionPath, data) +} + +func (tee *Tee) UnblockTabletAction(actionPath string) error { + return tee.primary.UnblockTabletAction(actionPath) +} diff --git a/go/vt/topotools/tee_test.go b/go/vt/topotools/tee_test.go new file mode 100644 index 00000000000..738c41e0bb5 --- /dev/null +++ b/go/vt/topotools/tee_test.go @@ -0,0 +1,64 @@ +// Copyright 2013, Google Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package topotools + +import ( + "testing" + +// "github.com/youtube/vitess/go/vt/key" + "github.com/youtube/vitess/go/vt/topo" +// "github.com/youtube/vitess/go/vt/zktopo" +// "github.com/youtube/vitess/go/zk" +// "github.com/youtube/vitess/go/zk/fakezk" +// "launchpad.net/gozk/zookeeper" +) + +func testConversion(topo.Server) { +} + +func testStringArrays(t *testing.T, expected, value []string) { + if len(expected) != len(value) { + t.Fatalf("Different arrays:\n%v\n%v", expected, value) + } + for i, e := range expected { + if e != value[i] { + t.Fatalf("Different arrays:\n%v\n%v", expected, value) + } + } +} + +func TestTee(t *testing.T) { + + // create the setup, copy the data + fromTS, toTS := createSetup(t) + CopyKeyspaces(fromTS, toTS) + CopyShards(fromTS, toTS, true) + CopyTablets(fromTS, toTS) + + // create a tee and check it implements the interface + tee := NewTee(fromTS, toTS) + testConversion(tee) + + // create a keyspace, make sure it is on both sides + if err := tee.CreateKeyspace("keyspace2"); err != nil { + t.Fatalf("tee.CreateKeyspace(keyspace2) failed: %v", err) + } + teeKeyspaces, err := tee.GetKeyspaces() + if err != nil { + t.Fatalf("tee.GetKeyspaces() failed: %v", err) + } + testStringArrays(t, []string{"keyspace2", "test_keyspace"}, teeKeyspaces) + fromKeyspaces, err := fromTS.GetKeyspaces() + if err != nil { + t.Fatalf("fromTS.GetKeyspaces() failed: %v", err) + } + testStringArrays(t, []string{"keyspace2", "test_keyspace"}, fromKeyspaces) + toKeyspaces, err := toTS.GetKeyspaces() + if err != nil { + t.Fatalf("toTS.GetKeyspaces() failed: %v", err) + } + testStringArrays(t, []string{"keyspace2", "test_keyspace"}, toKeyspaces) + +} \ No newline at end of file diff --git a/go/cmd/topo2topo/test_zk_client.json b/go/vt/topotools/test_zk_client.json similarity index 100% rename from go/cmd/topo2topo/test_zk_client.json rename to go/vt/topotools/test_zk_client.json From 9fca690e0f869f6a07b9c9d8d471d072b6c8f41e Mon Sep 17 00:00:00 2001 From: Alain Jobart Date: Fri, 19 Jul 2013 17:02:56 -0700 Subject: [PATCH 2/4] Now the Tee supports most operations. --- go/vt/topotools/tee.go | 315 ++++++++++++++++++++++++++++++++---- go/vt/topotools/tee_test.go | 2 +- 2 files changed, 281 insertions(+), 36 deletions(-) diff --git a/go/vt/topotools/tee.go b/go/vt/topotools/tee.go index 45be1781ba6..90418c217e6 100644 --- a/go/vt/topotools/tee.go +++ b/go/vt/topotools/tee.go @@ -5,6 +5,7 @@ package topotools import ( + "fmt" "time" "github.com/youtube/vitess/go/relog" @@ -16,29 +17,61 @@ import ( // changes to a secondary topo.Server. It also locks both topo servers // when needed. It is meant to be used during transitions from one // topo.Server to another. +// +// - primary: we read everything from it, and write to it +// - secondary: we write to it as well, but we usually don't fail. +// - we lock primary/secondary if reverseLockOrder is False, +// or secondary/primary is reverseLockOrder is True. type Tee struct { - primary topo.Server - secondary topo.Server + primary topo.Server + secondary topo.Server + + readFrom topo.Server + lockFirst topo.Server + lockSecond topo.Server + + keyspaceLockPaths map[string]string + shardLockPaths map[string]string } -func NewTee(primary, secondary topo.Server) *Tee { - return &Tee{primary, secondary} +func NewTee(primary, secondary topo.Server, reverseLockOrder bool) *Tee { + lockFirst := primary + lockSecond := secondary + if reverseLockOrder { + lockFirst = secondary + lockSecond = primary + } + return &Tee{ + primary: primary, + secondary: secondary, + readFrom: primary, + lockFirst: lockFirst, + lockSecond: lockSecond, + keyspaceLockPaths: make(map[string]string), + shardLockPaths: make(map[string]string), + } } +// // topo.Server management interface. +// func (tee *Tee) Close() { tee.primary.Close() tee.secondary.Close() } +// // Cell management, global +// func (tee *Tee) GetKnownCells() ([]string, error) { - return tee.primary.GetKnownCells() + return tee.readFrom.GetKnownCells() } +// // Keyspace management, global. +// func (tee *Tee) CreateKeyspace(keyspace string) error { if err := tee.primary.CreateKeyspace(keyspace); err != nil { @@ -53,7 +86,7 @@ func (tee *Tee) CreateKeyspace(keyspace string) error { } func (tee *Tee) GetKeyspaces() ([]string, error) { - return tee.primary.GetKeyspaces() + return tee.readFrom.GetKeyspaces() } func (tee *Tee) DeleteKeyspaceShards(keyspace string) error { @@ -68,7 +101,9 @@ func (tee *Tee) DeleteKeyspaceShards(keyspace string) error { return nil } +// // Shard management, global. +// func (tee *Tee) CreateShard(keyspace, shard string) error { err := tee.primary.CreateShard(keyspace, shard) @@ -77,7 +112,7 @@ func (tee *Tee) CreateShard(keyspace, shard string) error { } serr := tee.secondary.CreateShard(keyspace, shard) - if serr != nil && err != topo.ErrNodeExists { + if serr != nil && serr != topo.ErrNodeExists { // not critical enough to fail relog.Warning("secondary.CreateShard(%v,%v) failed: %v", keyspace, shard, err) } @@ -85,169 +120,379 @@ func (tee *Tee) CreateShard(keyspace, shard string) error { } func (tee *Tee) UpdateShard(si *topo.ShardInfo) error { - return tee.primary.UpdateShard(si) + if err := tee.primary.UpdateShard(si); err != nil { + // failed on primary, not updating secondary + return err + } + + if err := tee.secondary.UpdateShard(si); err != nil { + // not critical enough to fail + relog.Warning("secondary.UpdateShard(%v,%v) failed: %v", si.Keyspace(), si.ShardName(), err) + } + return nil } func (tee *Tee) ValidateShard(keyspace, shard string) error { - return tee.primary.ValidateShard(keyspace, shard) + err := tee.primary.ValidateShard(keyspace, shard) + if err != nil { + return err + } + + if err := tee.secondary.ValidateShard(keyspace, shard); err != nil { + // not critical enough to fail + relog.Warning("secondary.ValidateShard(%v,%v) failed: %v", keyspace, shard, err) + } + return nil } func (tee *Tee) GetShard(keyspace, shard string) (si *topo.ShardInfo, err error) { - return tee.primary.GetShard(keyspace, shard) + return tee.readFrom.GetShard(keyspace, shard) } func (tee *Tee) GetShardNames(keyspace string) ([]string, error) { - return tee.primary.GetShardNames(keyspace ) + return tee.readFrom.GetShardNames(keyspace ) } +// // Tablet management, per cell. +// func (tee *Tee) CreateTablet(tablet *topo.Tablet) error { - return tee.primary.CreateTablet(tablet) + err := tee.primary.CreateTablet(tablet) + if err != nil && err != topo.ErrNodeExists { + return err + } + + if err := tee.primary.CreateTablet(tablet); err != nil && err != topo.ErrNodeExists { + // not critical enough to fail + relog.Warning("secondary.CreateTablet(%v) failed: %v", tablet.Alias(), err) + } + return err } func (tee *Tee) UpdateTablet(tablet *topo.TabletInfo, existingVersion int) (newVersion int, err error) { - return tee.primary.UpdateTablet(tablet, existingVersion) + if newVersion, err = tee.primary.UpdateTablet(tablet, existingVersion); err != nil { + // failed on primary, not updating secondary + return + } + + if _, err := tee.secondary.UpdateTablet(tablet, existingVersion); err != nil { + // not critical enough to fail + relog.Warning("secondary.UpdateTablet(%v) failed: %v", tablet.Alias(), err) + } + return } func (tee *Tee) UpdateTabletFields(tabletAlias topo.TabletAlias, update func(*topo.Tablet) error) error { - return tee.primary.UpdateTabletFields(tabletAlias, update) + if err := tee.primary.UpdateTabletFields(tabletAlias, update); err != nil { + // failed on primary, not updating secondary + return err + } + + if err := tee.secondary.UpdateTabletFields(tabletAlias, update); err != nil { + // not critical enough to fail + relog.Warning("secondary.UpdateTabletFields(%v) failed: %v", tabletAlias, err) + } + return nil } func (tee *Tee) DeleteTablet(alias topo.TabletAlias) error { - return tee.primary.DeleteTablet(alias) + if err := tee.primary.DeleteTablet(alias); err != nil { + return err + } + + if err := tee.secondary.DeleteTablet(alias); err != nil { + // not critical enough to fail + relog.Warning("secondary.DeleteTablet(%v) failed: %v", alias, err) + } + return nil } func (tee *Tee) ValidateTablet(alias topo.TabletAlias) error { - return tee.primary.ValidateTablet(alias) + if err := tee.primary.ValidateTablet(alias); err != nil { + return err + } + + if err := tee.secondary.ValidateTablet(alias); err != nil { + // not critical enough to fail + relog.Warning("secondary.ValidateTablet(%v) failed: %v", alias, err) + } + return nil } func (tee *Tee) GetTablet(alias topo.TabletAlias) (*topo.TabletInfo, error) { - return tee.primary.GetTablet(alias) + return tee.readFrom.GetTablet(alias) } func (tee *Tee) GetTabletsByCell(cell string) ([]topo.TabletAlias, error) { - return tee.primary.GetTabletsByCell(cell) + return tee.readFrom.GetTabletsByCell(cell) } +// // Replication graph management, global. +// func (tee *Tee) GetReplicationPaths(keyspace, shard, repPath string) ([]topo.TabletAlias, error) { - return tee.primary.GetReplicationPaths(keyspace, shard, repPath) + return tee.readFrom.GetReplicationPaths(keyspace, shard, repPath) } func (tee *Tee) CreateReplicationPath(keyspace, shard, repPath string) error { - return tee.primary.CreateReplicationPath(keyspace, shard, repPath) + if err := tee.primary.CreateReplicationPath(keyspace, shard, repPath); err != nil { + return err + } + + if err := tee.secondary.CreateReplicationPath(keyspace, shard, repPath); err != nil { + // not critical enough to fail + relog.Warning("secondary.CreateReplicationPath(%v, %v, %v) failed: %v", keyspace, shard, repPath, err) + } + return nil } func (tee *Tee) DeleteReplicationPath(keyspace, shard, repPath string) error { - return tee.primary.DeleteReplicationPath(keyspace, shard, repPath) + if err := tee.primary.DeleteReplicationPath(keyspace, shard, repPath); err != nil { + return err + } + + if err := tee.secondary.DeleteReplicationPath(keyspace, shard, repPath); err != nil { + // not critical enough to fail + relog.Warning("secondary.DeleteReplicationPath(%v, %v, %v) failed: %v", keyspace, shard, repPath, err) + } + return nil } +// // Serving Graph management, per cell. +// func (tee *Tee) GetSrvTabletTypesPerShard(cell, keyspace, shard string) ([]topo.TabletType, error) { - return tee.primary.GetSrvTabletTypesPerShard(cell, keyspace, shard) + return tee.readFrom.GetSrvTabletTypesPerShard(cell, keyspace, shard) } func (tee *Tee) UpdateSrvTabletType(cell, keyspace, shard string, tabletType topo.TabletType, addrs *topo.VtnsAddrs) error { - return tee.primary.UpdateSrvTabletType(cell, keyspace, shard, tabletType, addrs) + if err := tee.primary.UpdateSrvTabletType(cell, keyspace, shard, tabletType, addrs); err != nil { + return err + } + + if err := tee.secondary.UpdateSrvTabletType(cell, keyspace, shard, tabletType, addrs); err != nil { + // not critical enough to fail + relog.Warning("secondary.UpdateSrvTabletType(%v, %v, %v, %v) failed: %v", cell, keyspace, shard, tabletType, err) + } + return nil } func (tee *Tee) GetSrvTabletType(cell, keyspace, shard string, tabletType topo.TabletType) (*topo.VtnsAddrs, error) { - return tee.primary.GetSrvTabletType(cell, keyspace, shard, tabletType) + return tee.readFrom.GetSrvTabletType(cell, keyspace, shard, tabletType) } func (tee *Tee) DeleteSrvTabletType(cell, keyspace, shard string, tabletType topo.TabletType) error { - return tee.primary.DeleteSrvTabletType(cell, keyspace, shard, tabletType) + if err := tee.primary.DeleteSrvTabletType(cell, keyspace, shard, tabletType); err != nil { + return err + } + + if err := tee.secondary.DeleteSrvTabletType(cell, keyspace, shard, tabletType); err != nil { + // not critical enough to fail + relog.Warning("secondary.DeleteSrvTabletType(%v, %v, %v, %v) failed: %v", cell, keyspace, shard, tabletType, err) + } + return nil } func (tee *Tee) UpdateSrvShard(cell, keyspace, shard string, srvShard *topo.SrvShard) error { - return tee.primary.UpdateSrvShard(cell, keyspace, shard, srvShard) + if err := tee.primary.UpdateSrvShard(cell, keyspace, shard, srvShard); err != nil { + return err + } + + if err := tee.secondary.UpdateSrvShard(cell, keyspace, shard, srvShard); err != nil { + // not critical enough to fail + relog.Warning("secondary.UpdateSrvShard(%v, %v, %v) failed: %v", cell, keyspace, shard, err) + } + return nil } func (tee *Tee) GetSrvShard(cell, keyspace, shard string) (*topo.SrvShard, error) { - return tee.primary.GetSrvShard(cell, keyspace, shard) + return tee.readFrom.GetSrvShard(cell, keyspace, shard) } func (tee *Tee) UpdateSrvKeyspace(cell, keyspace string, srvKeyspace *topo.SrvKeyspace) error { - return tee.primary.UpdateSrvKeyspace(cell, keyspace, srvKeyspace) + if err := tee.primary.UpdateSrvKeyspace(cell, keyspace, srvKeyspace); err != nil { + return err + } + + if err := tee.secondary.UpdateSrvKeyspace(cell, keyspace, srvKeyspace); err != nil { + // not critical enough to fail + relog.Warning("secondary.UpdateSrvKeyspace(%v, %v) failed: %v", cell, keyspace, err) + } + return nil } func (tee *Tee) GetSrvKeyspace(cell, keyspace string) (*topo.SrvKeyspace, error) { - return tee.primary.GetSrvKeyspace(cell, keyspace) + return tee.readFrom.GetSrvKeyspace(cell, keyspace) } func (tee *Tee) UpdateTabletEndpoint(cell, keyspace, shard string, tabletType topo.TabletType, addr *topo.VtnsAddr) error { - return tee.primary.UpdateTabletEndpoint(cell, keyspace, shard, tabletType, addr) + if err := tee.primary.UpdateTabletEndpoint(cell, keyspace, shard, tabletType, addr); err != nil { + return err + } + + if err := tee.secondary.UpdateTabletEndpoint(cell, keyspace, shard, tabletType, addr); err != nil { + // not critical enough to fail + relog.Warning("secondary.UpdateTabletEndpoint(%v, %v, %v, %v) failed: %v", cell, keyspace, shard, tabletType, err) + } + return nil } +// // Keyspace and Shard locks for actions, global. +// func (tee *Tee) LockKeyspaceForAction(keyspace, contents string, timeout time.Duration, interrupted chan struct{}) (string, error) { - return tee.primary.LockKeyspaceForAction(keyspace, contents, timeout, interrupted) + // lock lockFirst + pLockPath, err := tee.lockFirst.LockKeyspaceForAction(keyspace, contents, timeout, interrupted) + if err != nil { + return "", err + } + + // lock lockSecond + sLockPath, err := tee.lockSecond.LockKeyspaceForAction(keyspace, contents, timeout, interrupted) + if err != nil { + if err := tee.lockFirst.UnlockKeyspaceForAction(keyspace, pLockPath, "{}"); err != nil { + relog.Warning("Failed to unlock lockFirst keyspace after failed lockSecond lock for %v", keyspace) + } + return "", err + } + + // remember both locks, keyed by lockFirst lock path + tee.keyspaceLockPaths[pLockPath] = sLockPath + return pLockPath, nil } func (tee *Tee) UnlockKeyspaceForAction(keyspace, lockPath, results string) error { - return tee.primary.UnlockKeyspaceForAction(keyspace, lockPath, results) + // get from map + sLockPath, ok := tee.keyspaceLockPaths[lockPath] + if !ok { + return fmt.Errorf("no lockPath %v in keyspaceLockPaths", lockPath) + } + delete(tee.keyspaceLockPaths, lockPath) + + // unlock lockSecond, then lockFirst + serr := tee.lockSecond.UnlockKeyspaceForAction(keyspace, sLockPath, results) + perr := tee.lockFirst.UnlockKeyspaceForAction(keyspace, lockPath, results) + + if serr != nil { + if perr != nil { + relog.Warning("Secondary UnlockKeyspaceForAction(%v, %v) failed: %v", keyspace, sLockPath, serr) + } + return serr + } + return perr } func (tee *Tee) LockShardForAction(keyspace, shard, contents string, timeout time.Duration, interrupted chan struct{}) (string, error) { - return tee.primary.LockShardForAction(keyspace, shard, contents, timeout, interrupted) + // lock lockFirst + pLockPath, err := tee.lockFirst.LockShardForAction(keyspace, shard, contents, timeout, interrupted) + if err != nil { + return "", err + } + + // lock lockSecond + sLockPath, err := tee.lockSecond.LockShardForAction(keyspace, shard, contents, timeout, interrupted) + if err != nil { + if err := tee.lockFirst.UnlockShardForAction(keyspace, shard, pLockPath, "{}"); err != nil { + relog.Warning("Failed to unlock lockFirst shard after failed lockSecond lock for %v/%v", keyspace, shard) + } + return "", err + } + + // remember both locks, keyed by lockFirst lock path + tee.shardLockPaths[pLockPath] = sLockPath + return pLockPath, nil } func (tee *Tee) UnlockShardForAction(keyspace, shard, lockPath, results string) error { - return tee.primary.UnlockShardForAction(keyspace, shard, lockPath, results) + // get from map + sLockPath, ok := tee.shardLockPaths[lockPath] + if !ok { + return fmt.Errorf("no lockPath %v in shardLockPaths", lockPath) + } + delete(tee.shardLockPaths, lockPath) + + // unlock lockSecond, then lockFirst + serr := tee.lockSecond.UnlockShardForAction(keyspace, shard, sLockPath, results) + perr := tee.lockFirst.UnlockShardForAction(keyspace, shard, lockPath, results) + + if serr != nil { + if perr != nil { + relog.Warning("Secondary UnlockShardForAction(%v/%v, %v) failed: %v", keyspace, shard, sLockPath, serr) + } + return serr + } + return perr } +// // Remote Tablet Actions, local cell. +// +// TODO(alainjobart) implement the split func (tee *Tee) WriteTabletAction(tabletAlias topo.TabletAlias, contents string) (string, error) { return tee.primary.WriteTabletAction(tabletAlias, contents) } +// TODO(alainjobart) implement the split func (tee *Tee) WaitForTabletAction(actionPath string, waitTime time.Duration, interrupted chan struct{}) (string, error) { return tee.primary.WaitForTabletAction(actionPath, waitTime, interrupted) } +// TODO(alainjobart) implement the split func (tee *Tee) PurgeTabletActions(tabletAlias topo.TabletAlias, canBePurged func(data string) bool) error { return tee.primary.PurgeTabletActions(tabletAlias, canBePurged) } +// // Supporting the local agent process, local cell. +// +// TODO(alainjobart) implement the split func (tee *Tee) ValidateTabletActions(tabletAlias topo.TabletAlias) error { return tee.primary.ValidateTabletActions(tabletAlias) } +// TODO(alainjobart) implement the split func (tee *Tee) CreateTabletPidNode(tabletAlias topo.TabletAlias, done chan struct{}) error { return tee.primary.CreateTabletPidNode(tabletAlias, done) } +// TODO(alainjobart) implement the split func (tee *Tee) ValidateTabletPidNode(tabletAlias topo.TabletAlias) error { return tee.primary.ValidateTabletPidNode(tabletAlias) } func (tee *Tee) GetSubprocessFlags() []string { - return tee.primary.GetSubprocessFlags() + p := tee.primary.GetSubprocessFlags() + p = append(p, tee.secondary.GetSubprocessFlags()...) + return p } +// TODO(alainjobart) implement the split func (tee *Tee) ActionEventLoop(tabletAlias topo.TabletAlias, dispatchAction func(actionPath, data string) error, done chan struct{}) { tee.primary.ActionEventLoop(tabletAlias, dispatchAction, done) } +// TODO(alainjobart) implement the split func (tee *Tee) ReadTabletActionPath(actionPath string) (topo.TabletAlias, string, int, error) { return tee.primary.ReadTabletActionPath(actionPath) } +// TODO(alainjobart) implement the split func (tee *Tee) UpdateTabletAction(actionPath, data string, version int) error { return tee.primary.UpdateTabletAction(actionPath, data, version) } +// TODO(alainjobart) implement the split func (tee *Tee) StoreTabletActionResponse(actionPath, data string) error { return tee.primary.StoreTabletActionResponse(actionPath, data) } +// TODO(alainjobart) implement the split func (tee *Tee) UnblockTabletAction(actionPath string) error { return tee.primary.UnblockTabletAction(actionPath) } diff --git a/go/vt/topotools/tee_test.go b/go/vt/topotools/tee_test.go index 738c41e0bb5..646bea0beb9 100644 --- a/go/vt/topotools/tee_test.go +++ b/go/vt/topotools/tee_test.go @@ -38,7 +38,7 @@ func TestTee(t *testing.T) { CopyTablets(fromTS, toTS) // create a tee and check it implements the interface - tee := NewTee(fromTS, toTS) + tee := NewTee(fromTS, toTS, true) testConversion(tee) // create a keyspace, make sure it is on both sides From 39d120ba731f02ddaa13071935180e86c253a78e Mon Sep 17 00:00:00 2001 From: Alain Jobart Date: Fri, 19 Jul 2013 17:05:40 -0700 Subject: [PATCH 3/4] Renaming a file. --- go/vt/zktopo/{zk_topology_server.go => server.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename go/vt/zktopo/{zk_topology_server.go => server.go} (100%) diff --git a/go/vt/zktopo/zk_topology_server.go b/go/vt/zktopo/server.go similarity index 100% rename from go/vt/zktopo/zk_topology_server.go rename to go/vt/zktopo/server.go From 8969356b3ab4f20974fd9f2f177d532a3cdcb0af Mon Sep 17 00:00:00 2001 From: Alain Jobart Date: Fri, 19 Jul 2013 17:43:15 -0700 Subject: [PATCH 4/4] Updated these files from Ric's feedback. --- go/vt/topotools/copy.go | 40 +++++++++++++++++++------------------ go/vt/topotools/tee.go | 25 +++++++++++------------ go/vt/topotools/tee_test.go | 5 +---- 3 files changed, 34 insertions(+), 36 deletions(-) diff --git a/go/vt/topotools/copy.go b/go/vt/topotools/copy.go index 59ea14e6b5d..3b02b6af531 100644 --- a/go/vt/topotools/copy.go +++ b/go/vt/topotools/copy.go @@ -2,6 +2,8 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. +// topotools package contains a few utility classes to handle topo.Server +// objects, and transitions. package topotools import ( @@ -20,23 +22,23 @@ func CopyKeyspaces(fromTS, toTS topo.Server) { } wg := sync.WaitGroup{} - er := concurrency.AllErrorRecorder{} + rec := concurrency.AllErrorRecorder{} for _, keyspace := range keyspaces { wg.Add(1) go func(keyspace string) { defer wg.Done() if err := toTS.CreateKeyspace(keyspace); err != nil { if err == topo.ErrNodeExists { - relog.Warning("Keyspace %v already exists", keyspace) + relog.Warning("keyspace %v already exists", keyspace) } else { - er.RecordError(err) + rec.RecordError(err) } } }(keyspace) } wg.Wait() - if er.HasErrors() { - relog.Fatal("copyKeyspaces failed: %v", err) + if rec.HasErrors() { + relog.Fatal("copyKeyspaces failed: %v", rec.Error()) } } @@ -48,20 +50,20 @@ func CopyShards(fromTS, toTS topo.Server, deleteKeyspaceShards bool) { } wg := sync.WaitGroup{} - er := concurrency.AllErrorRecorder{} + rec := concurrency.AllErrorRecorder{} for _, keyspace := range keyspaces { wg.Add(1) go func(keyspace string) { defer wg.Done() shards, err := fromTS.GetShardNames(keyspace) if err != nil { - er.RecordError(err) + rec.RecordError(err) return } if deleteKeyspaceShards { if err := toTS.DeleteKeyspaceShards(keyspace); err != nil { - er.RecordError(err) + rec.RecordError(err) return } } @@ -72,9 +74,9 @@ func CopyShards(fromTS, toTS topo.Server, deleteKeyspaceShards bool) { defer wg.Done() if err := toTS.CreateShard(keyspace, shard); err != nil { if err == topo.ErrNodeExists { - relog.Warning("Shard %v/%v already exists", keyspace, shard) + relog.Warning("shard %v/%v already exists", keyspace, shard) } else { - er.RecordError(err) + rec.RecordError(err) } } }(keyspace, shard) @@ -82,8 +84,8 @@ func CopyShards(fromTS, toTS topo.Server, deleteKeyspaceShards bool) { }(keyspace) } wg.Wait() - if er.HasErrors() { - relog.Fatal("copyShards failed: %v", err) + if rec.HasErrors() { + relog.Fatal("copyShards failed: %v", rec.Error()) } } @@ -95,14 +97,14 @@ func CopyTablets(fromTS, toTS topo.Server) { } wg := sync.WaitGroup{} - er := concurrency.AllErrorRecorder{} + rec := concurrency.AllErrorRecorder{} for _, cell := range cells { wg.Add(1) go func(cell string) { defer wg.Done() tabletAliases, err := fromTS.GetTabletsByCell(cell) if err != nil { - er.RecordError(err) + rec.RecordError(err) } else { for _, tabletAlias := range tabletAliases { wg.Add(1) @@ -112,7 +114,7 @@ func CopyTablets(fromTS, toTS topo.Server) { // read the source tablet ti, err := fromTS.GetTablet(tabletAlias) if err != nil { - er.RecordError(err) + rec.RecordError(err) return } @@ -124,7 +126,7 @@ func CopyTablets(fromTS, toTS topo.Server) { } if err != nil { - er.RecordError(err) + rec.RecordError(err) return } @@ -132,7 +134,7 @@ func CopyTablets(fromTS, toTS topo.Server) { // for masters only here if ti.Type == topo.TYPE_MASTER { if err = toTS.CreateReplicationPath(ti.Keyspace, ti.Shard, ti.Alias().String()); err != nil && err != topo.ErrNodeExists { - er.RecordError(err) + rec.RecordError(err) } } }(tabletAlias) @@ -141,7 +143,7 @@ func CopyTablets(fromTS, toTS topo.Server) { }(cell) } wg.Wait() - if er.HasErrors() { - relog.Fatal("copyTablets failed: %v", er.Error()) + if rec.HasErrors() { + relog.Fatal("copyTablets failed: %v", rec.Error()) } } diff --git a/go/vt/topotools/tee.go b/go/vt/topotools/tee.go index 90418c217e6..f2a6296b01c 100644 --- a/go/vt/topotools/tee.go +++ b/go/vt/topotools/tee.go @@ -6,6 +6,7 @@ package topotools import ( "fmt" + "os" "time" "github.com/youtube/vitess/go/relog" @@ -21,7 +22,7 @@ import ( // - primary: we read everything from it, and write to it // - secondary: we write to it as well, but we usually don't fail. // - we lock primary/secondary if reverseLockOrder is False, -// or secondary/primary is reverseLockOrder is True. +// or secondary/primary if reverseLockOrder is True. type Tee struct { primary topo.Server secondary topo.Server @@ -78,7 +79,7 @@ func (tee *Tee) CreateKeyspace(keyspace string) error { return err } - // this is critical enough we want to fail + // this is critical enough that we want to fail if err := tee.secondary.CreateKeyspace(keyspace); err != nil { return err } @@ -430,38 +431,34 @@ func (tee *Tee) UnlockShardForAction(keyspace, shard, lockPath, results string) // // Remote Tablet Actions, local cell. +// TODO(alainjobart) implement the split // -// TODO(alainjobart) implement the split func (tee *Tee) WriteTabletAction(tabletAlias topo.TabletAlias, contents string) (string, error) { return tee.primary.WriteTabletAction(tabletAlias, contents) } -// TODO(alainjobart) implement the split func (tee *Tee) WaitForTabletAction(actionPath string, waitTime time.Duration, interrupted chan struct{}) (string, error) { return tee.primary.WaitForTabletAction(actionPath, waitTime, interrupted) } -// TODO(alainjobart) implement the split func (tee *Tee) PurgeTabletActions(tabletAlias topo.TabletAlias, canBePurged func(data string) bool) error { return tee.primary.PurgeTabletActions(tabletAlias, canBePurged) } // // Supporting the local agent process, local cell. +// TODO(alainjobart) implement the split // -// TODO(alainjobart) implement the split func (tee *Tee) ValidateTabletActions(tabletAlias topo.TabletAlias) error { return tee.primary.ValidateTabletActions(tabletAlias) } -// TODO(alainjobart) implement the split func (tee *Tee) CreateTabletPidNode(tabletAlias topo.TabletAlias, done chan struct{}) error { return tee.primary.CreateTabletPidNode(tabletAlias, done) } -// TODO(alainjobart) implement the split func (tee *Tee) ValidateTabletPidNode(tabletAlias topo.TabletAlias) error { return tee.primary.ValidateTabletPidNode(tabletAlias) } @@ -469,30 +466,32 @@ func (tee *Tee) ValidateTabletPidNode(tabletAlias topo.TabletAlias) error { func (tee *Tee) GetSubprocessFlags() []string { p := tee.primary.GetSubprocessFlags() p = append(p, tee.secondary.GetSubprocessFlags()...) + + // propagate the VT_TOPOLOGY_SERVER environment too + name := os.Getenv("VT_TOPOLOGY_SERVER") + if name != "" { + p = append(p, "VT_TOPOLOGY_SERVER=" + name) + } + return p } -// TODO(alainjobart) implement the split func (tee *Tee) ActionEventLoop(tabletAlias topo.TabletAlias, dispatchAction func(actionPath, data string) error, done chan struct{}) { tee.primary.ActionEventLoop(tabletAlias, dispatchAction, done) } -// TODO(alainjobart) implement the split func (tee *Tee) ReadTabletActionPath(actionPath string) (topo.TabletAlias, string, int, error) { return tee.primary.ReadTabletActionPath(actionPath) } -// TODO(alainjobart) implement the split func (tee *Tee) UpdateTabletAction(actionPath, data string, version int) error { return tee.primary.UpdateTabletAction(actionPath, data, version) } -// TODO(alainjobart) implement the split func (tee *Tee) StoreTabletActionResponse(actionPath, data string) error { return tee.primary.StoreTabletActionResponse(actionPath, data) } -// TODO(alainjobart) implement the split func (tee *Tee) UnblockTabletAction(actionPath string) error { return tee.primary.UnblockTabletAction(actionPath) } diff --git a/go/vt/topotools/tee_test.go b/go/vt/topotools/tee_test.go index 646bea0beb9..af55bfeba4c 100644 --- a/go/vt/topotools/tee_test.go +++ b/go/vt/topotools/tee_test.go @@ -15,9 +15,6 @@ import ( // "launchpad.net/gozk/zookeeper" ) -func testConversion(topo.Server) { -} - func testStringArrays(t *testing.T, expected, value []string) { if len(expected) != len(value) { t.Fatalf("Different arrays:\n%v\n%v", expected, value) @@ -39,7 +36,7 @@ func TestTee(t *testing.T) { // create a tee and check it implements the interface tee := NewTee(fromTS, toTS, true) - testConversion(tee) + var _ topo.Server = tee // create a keyspace, make sure it is on both sides if err := tee.CreateKeyspace("keyspace2"); err != nil {