Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to run vtcombo and vttest local cluster with real topo server #9176

Merged
merged 4 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go/cmd/vtcombo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var (

mysqlPort = flag.Int("mysql_port", 3306, "mysql port")

remoteTopoServer = flag.Bool("remote_topo_server", false, "Should vtcombo use a remote topology server instead of starting its own in-memory topology server. "+
externalTopoServer = flag.Bool("external_topo_server", false, "Should vtcombo use an external topology server instead of starting its own in-memory topology server. "+
"If true, vtcombo will use the flags defined in topo/server.go to open topo server")

ts *topo.Server
Expand Down Expand Up @@ -138,9 +138,9 @@ func main() {
flag.Set("log_dir", "$VTDATAROOT/tmp")
}

if *remoteTopoServer {
if *externalTopoServer {
// Open topo server based on the command line flags defined at topo/server.go
zhongr3n marked this conversation as resolved.
Show resolved Hide resolved
// do not create cell info as it should be done by whoever sets up the remote topo server
// do not create cell info as it should be done by whoever sets up the external topo server
ts = topo.Open()
} else {
// Create topo server. We use a 'memorytopo' implementation.
Expand Down
8 changes: 4 additions & 4 deletions go/cmd/vttestserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ func init() {
flag.BoolVar(&config.EnableOnlineDDL, "enable_online_ddl", true, "Allow users to submit, review and control Online DDL")
flag.BoolVar(&config.EnableDirectDDL, "enable_direct_ddl", true, "Allow users to submit direct DDL statements")

// flags for using an actual topo implementation for vtcombo instead of in-memory topo. useful for test setup where topo server is shared across multiple vtcombo processes or other components
flag.StringVar(&config.RemoteTopoImplementation, "remote_topo_implementation", "", "the topology implementation to use for vtcombo process")
flag.StringVar(&config.RemoteTopoGlobalServerAddress, "remote_topo_global_server_address", "", "the address of the global topology server for vtcombo process")
flag.StringVar(&config.RemoteTopoGlobalRoot, "remote_topo_global_root", "", "the path of the global topology data in the global topology server for vtcombo process")
// flags for using an actual topo implementation for vtcombo instead of in-memory topo. useful for test setup where an external topo server is shared across multiple vtcombo processes or other components
flag.StringVar(&config.ExternalTopoImplementation, "external_topo_implementation", "", "the topology implementation to use for vtcombo process")
flag.StringVar(&config.ExternalTopoGlobalServerAddress, "external_topo_global_server_address", "", "the address of the global topology server for vtcombo process")
flag.StringVar(&config.ExternalTopoGlobalRoot, "external_topo_global_root", "", "the path of the global topology data in the global topology server for vtcombo process")
}

func (t *topoFlags) buildTopology() (*vttestpb.VTTestTopology, error) {
Expand Down
36 changes: 36 additions & 0 deletions go/cmd/vttestserver/vttestserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"testing"
"time"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo/test"

"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -308,6 +311,39 @@ func TestMtlsAuthUnauthorizedFails(t *testing.T) {
assert.Contains(t, err.Error(), "code = Unauthenticated desc = client certificate not authorized")
}

func TestExternalTopoServerConsul(t *testing.T) {
// Start a single consul in the background.
cmd, configFilename, serverAddr := test.StartConsul(t, "")
defer func() {
// Alerts command did not run successful
if err := cmd.Process.Kill(); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we use consul agent -dev i think we could do consul leave for a graceful shutdown and don't need to call os.Remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tried consul leave and it adds 10 seconds for grace shutdown. keeping process.kill

log.Errorf("cmd process kill has an error: %v", err)
}
// Alerts command did not run successful
if err := cmd.Wait(); err != nil {
log.Errorf("cmd process wait has an error: %v", err)
}
os.Remove(configFilename)
}()

args := os.Args
conf := config
defer resetFlags(args, conf)

cluster, err := startCluster("-external_topo_implementation=consul",
fmt.Sprintf("-external_topo_global_server_address=%s", serverAddr), "-external_topo_global_root=consul_test/global")
defer cluster.TearDown()

assert.NoError(t, err)
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table", vindex: "my_vdx", vindexType: "hash", column: "id"})
assertColumnVindex(t, cluster, columnVindex{keyspace: "app_customer", table: "customers", vindex: "hash", vindexType: "hash", column: "id"})

// Add Hash vindex via vtgate execution on table
err = addColumnVindex(cluster, "test_keyspace", "alter vschema on test_table1 add vindex my_vdx (id)")
assert.NoError(t, err)
assertColumnVindex(t, cluster, columnVindex{keyspace: "test_keyspace", table: "test_table1", vindex: "my_vdx", vindexType: "hash", column: "id"})
}

func startPersistentCluster(dir string, flags ...string) (vttest.LocalCluster, error) {
flags = append(flags, []string{
"-persistent_mode",
Expand Down
100 changes: 4 additions & 96 deletions go/vt/topo/consultopo/server_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ limitations under the License.
package consultopo

import (
"encoding/json"
"fmt"
"os"
"os/exec"
"path"
"testing"
"time"
Expand All @@ -29,108 +27,18 @@ import (

"context"

"github.com/hashicorp/consul/api"

"vitess.io/vitess/go/testfiles"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/test"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// startConsul starts a consul subprocess, and waits for it to be ready.
// Returns the exec.Cmd forked, the config file to remove after the test,
// and the server address to RPC-connect to.
func startConsul(t *testing.T, authToken string) (*exec.Cmd, string, string) {
// Create a temporary config file, as ports cannot all be set
// via command line. The file name has to end with '.json' so
// we're not using TempFile.
configDir, err := os.MkdirTemp("", "consul")
if err != nil {
t.Fatalf("cannot create temp dir: %v", err)
}
defer os.RemoveAll(configDir)

configFilename := path.Join(configDir, "consul.json")
configFile, err := os.OpenFile(configFilename, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
t.Fatalf("cannot create tempfile: %v", err)
}

// Create the JSON config, save it.
port := testfiles.GoVtTopoConsultopoPort
config := map[string]interface{}{
"ports": map[string]int{
"dns": port,
"http": port + 1,
"serf_lan": port + 2,
"serf_wan": port + 3,
},
}

if authToken != "" {
config["datacenter"] = "vitess"
config["acl_datacenter"] = "vitess"
config["acl_master_token"] = authToken
config["acl_default_policy"] = "deny"
config["acl_down_policy"] = "extend-cache"
}

data, err := json.Marshal(config)
if err != nil {
t.Fatalf("cannot json-encode config: %v", err)
}
if _, err := configFile.Write(data); err != nil {
t.Fatalf("cannot write config: %v", err)
}
if err := configFile.Close(); err != nil {
t.Fatalf("cannot close config: %v", err)
}

cmd := exec.Command("consul",
"agent",
"-dev",
"-config-file", configFilename)
err = cmd.Start()
if err != nil {
t.Fatalf("failed to start consul: %v", err)
}

// Create a client to connect to the created consul.
serverAddr := fmt.Sprintf("localhost:%v", port+1)
cfg := api.DefaultConfig()
cfg.Address = serverAddr
if authToken != "" {
cfg.Token = authToken
}
c, err := api.NewClient(cfg)
if err != nil {
t.Fatalf("api.NewClient(%v) failed: %v", serverAddr, err)
}

// Wait until we can list "/", or timeout.
start := time.Now()
kv := c.KV()
for {
_, _, err := kv.List("/", nil)
if err == nil {
break
}
if time.Since(start) > 10*time.Second {
t.Fatalf("Failed to start consul daemon in time. Consul is returning error: %v", err)
}
time.Sleep(10 * time.Millisecond)
}

return cmd, configFilename, serverAddr
}

func TestConsulTopo(t *testing.T) {
// One test is going to wait that full period, so make it shorter.
*watchPollDuration = 100 * time.Millisecond

// Start a single consul in the background.
cmd, configFilename, serverAddr := startConsul(t, "")
cmd, configFilename, serverAddr := test.StartConsul(t, "")
defer func() {
// Alerts command did not run successful
if err := cmd.Process.Kill(); err != nil {
Expand Down Expand Up @@ -176,7 +84,7 @@ func TestConsulTopoWithChecks(t *testing.T) {
*consulLockSessionTTL = "15s"

// Start a single consul in the background.
cmd, configFilename, serverAddr := startConsul(t, "")
cmd, configFilename, serverAddr := test.StartConsul(t, "")
defer func() {
// Alerts command did not run successful
if err := cmd.Process.Kill(); err != nil {
Expand Down Expand Up @@ -220,7 +128,7 @@ func TestConsulTopoWithAuth(t *testing.T) {
*watchPollDuration = 100 * time.Millisecond

// Start a single consul in the background.
cmd, configFilename, serverAddr := startConsul(t, "123456")
cmd, configFilename, serverAddr := test.StartConsul(t, "123456")
defer func() {
// Alerts command did not run successful
if err := cmd.Process.Kill(); err != nil {
Expand Down Expand Up @@ -277,7 +185,7 @@ func TestConsulTopoWithAuthFailure(t *testing.T) {
*watchPollDuration = 100 * time.Millisecond

// Start a single consul in the background.
cmd, configFilename, serverAddr := startConsul(t, "123456")
cmd, configFilename, serverAddr := test.StartConsul(t, "123456")
defer func() {
cmd.Process.Kill()
cmd.Wait()
Expand Down
97 changes: 97 additions & 0 deletions go/vt/topo/test/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,17 @@ limitations under the License.
package test

import (
"encoding/json"
"fmt"
"os"
"os/exec"
"path"
"testing"
"time"

"github.com/hashicorp/consul/api"

"vitess.io/vitess/go/testfiles"

"vitess.io/vitess/go/vt/topo"

Expand Down Expand Up @@ -112,3 +122,90 @@ func TopoServerTestSuite(t *testing.T, factory func() *topo.Server) {
checkWatchInterrupt(t, ts)
ts.Close()
}

// StartConsul starts a consul subprocess, and waits for it to be ready.
// Returns the exec.Cmd forked, the config file to remove after the test,
// and the server address to RPC-connect to.
func StartConsul(t *testing.T, authToken string) (*exec.Cmd, string, string) {
// Create a temporary config file, as ports cannot all be set
// via command line. The file name has to end with '.json' so
// we're not using TempFile.
configDir, err := os.MkdirTemp("", "consul")
if err != nil {
t.Fatalf("cannot create temp dir: %v", err)
}
defer os.RemoveAll(configDir)

configFilename := path.Join(configDir, "consul.json")
configFile, err := os.OpenFile(configFilename, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
t.Fatalf("cannot create tempfile: %v", err)
}

// Create the JSON config, save it.
port := testfiles.GoVtTopoConsultopoPort
config := map[string]interface{}{
"ports": map[string]int{
"dns": port,
"http": port + 1,
"serf_lan": port + 2,
"serf_wan": port + 3,
},
}

if authToken != "" {
config["datacenter"] = "vitess"
config["acl_datacenter"] = "vitess"
config["acl_master_token"] = authToken
config["acl_default_policy"] = "deny"
config["acl_down_policy"] = "extend-cache"
zhongr3n marked this conversation as resolved.
Show resolved Hide resolved
}

data, err := json.Marshal(config)
if err != nil {
t.Fatalf("cannot json-encode config: %v", err)
}
if _, err := configFile.Write(data); err != nil {
t.Fatalf("cannot write config: %v", err)
}
if err := configFile.Close(); err != nil {
t.Fatalf("cannot close config: %v", err)
}

cmd := exec.Command("consul",
"agent",
"-dev",
"-config-file", configFilename)
err = cmd.Start()
if err != nil {
t.Fatalf("failed to start consul: %v", err)
}

// Create a client to connect to the created consul.
serverAddr := fmt.Sprintf("localhost:%v", port+1)
cfg := api.DefaultConfig()
cfg.Address = serverAddr
if authToken != "" {
cfg.Token = authToken
}
c, err := api.NewClient(cfg)
if err != nil {
t.Fatalf("api.NewClient(%v) failed: %v", serverAddr, err)
}

// Wait until we can list "/", or timeout.
start := time.Now()
kv := c.KV()
for {
_, _, err := kv.List("/", nil)
if err == nil {
break
}
if time.Since(start) > 10*time.Second {
t.Fatalf("Failed to start consul daemon in time. Consul is returning error: %v", err)
}
time.Sleep(10 * time.Millisecond)
}

return cmd, configFilename, serverAddr
}
14 changes: 7 additions & 7 deletions go/vt/vttest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,11 @@ type Config struct {
EnableDirectDDL bool

// Allow users to start a local cluster using a remote topo server
RemoteTopoImplementation string
ExternalTopoImplementation string

RemoteTopoGlobalServerAddress string
ExternalTopoGlobalServerAddress string

RemoteTopoGlobalRoot string
ExternalTopoGlobalRoot string
}

// InitSchemas is a shortcut for tests that just want to setup a single
Expand Down Expand Up @@ -246,11 +246,11 @@ func (db *LocalCluster) Setup() error {
log.Infof("LocalCluster environment: %+v", db.Env)

// Set up topo manager if we are using a remote topo server
if db.RemoteTopoImplementation != "" {
db.topo = db.Env.TopoManager(db.RemoteTopoImplementation, db.RemoteTopoGlobalServerAddress, db.RemoteTopoGlobalRoot, db.Topology)
log.Infof("Initializing Topo Manager (%T)...", db.topo)
if db.ExternalTopoImplementation != "" {
db.topo = db.Env.TopoManager(db.ExternalTopoImplementation, db.ExternalTopoGlobalServerAddress, db.ExternalTopoGlobalRoot, db.Topology)
log.Infof("Initializing Topo Manager: %+v", db.topo)
if err := db.topo.Setup(); err != nil {
log.Errorf("Failed to set up Topo Manager: %s", err)
log.Errorf("Failed to set up Topo Manager: %v", err)
return err
}
}
Expand Down
10 changes: 6 additions & 4 deletions go/vt/vttest/topoctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (ctl *Topoctl) Setup() error {
// Create cells if it doesn't exist to be idempotent. Should work when we share the same topo server across multiple local clusters.
for _, cell := range ctl.Topology.Cells {
_, err := topoServer.GetCellInfo(ctx, cell, true)
// Cell already exists. no-op
// Cell info already exists. no-op
if err == nil {
continue
}
Expand All @@ -46,9 +46,11 @@ func (ctl *Topoctl) Setup() error {
return err
}

// try to create the cell that doesn't exist
// Use empty cell info till we have a use case to set up local cell properly
err = topoServer.CreateCellInfo(ctx, cell, &topodatapb.CellInfo{})
// Use the same topo server address in cell info, or else it would cause error when talking to local cell topo
// Use dummy (empty) cell root till we have a use case to set up local cells properly
cellInfo := &topodatapb.CellInfo{ServerAddress: ctl.TopoGlobalServerAddress}

err = topoServer.CreateCellInfo(ctx, cell, cellInfo)
if err != nil {
return err
}
Expand Down
Loading