Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[9.0] Fix for Query Serving when Toposerver is Down #8046

Merged
merged 8 commits into from
May 6, 2021
Merged
13 changes: 10 additions & 3 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type LocalProcessCluster struct {
Cell string
BaseTabletUID int
Hostname string
TopoFlavor string
TopoPort int
TmpDirectory string
OriginalVTDATAROOT string
Expand Down Expand Up @@ -173,6 +174,8 @@ func (cluster *LocalProcessCluster) StartTopo() (err error) {
if cluster.Cell == "" {
cluster.Cell = DefaultCell
}

topoFlavor = cluster.TopoFlavorString()
cluster.TopoPort = cluster.GetAndReservePort()
cluster.TmpDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/tmp_%d", cluster.GetAndReservePort()))
cluster.TopoProcess = *TopoProcessInstance(cluster.TopoPort, cluster.GetAndReservePort(), cluster.Hostname, *topoFlavor, "global")
Expand Down Expand Up @@ -758,9 +761,13 @@ func (cluster *LocalProcessCluster) StartVttablet(tablet *Vttablet, servingStatu
return tablet.VttabletProcess.Setup()
}

//func (cluster *LocalProcessCluster) NewOrcInstance() OrchestratorProcess {
//
//}
// TopoFlavorString returns the topo flavor
func (cluster *LocalProcessCluster) TopoFlavorString() *string {
if cluster.TopoFlavor != "" {
return &cluster.TopoFlavor
}
return topoFlavor
}

func getCoveragePath(fileName string) string {
covDir := os.Getenv("COV_DIR")
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/topo_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoo
// Attempt graceful shutdown with SIGTERM first
_ = topo.proc.Process.Signal(syscall.SIGTERM)

if !*keepData {
if !(*keepData || keepdata) {
_ = os.RemoveAll(topo.DataDirectory)
_ = os.RemoveAll(currentRoot)
}
Expand Down
148 changes: 148 additions & 0 deletions go/test/endtoend/topotest/consul/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package consul

import (
"context"
"flag"
"fmt"
"os"
"testing"
"time"

"vitess.io/vitess/go/vt/log"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
)

var (
clusterInstance *cluster.LocalProcessCluster
cell = "zone1"
hostname = "localhost"
KeyspaceName = "customer"
SchemaSQL = `
CREATE TABLE t1 (
c1 BIGINT NOT NULL,
c2 BIGINT NOT NULL,
c3 BIGINT,
c4 varchar(100),
PRIMARY KEY (c1),
UNIQUE KEY (c2),
UNIQUE KEY (c3),
UNIQUE KEY (c4)
) ENGINE=Innodb;`
VSchema = `
{
"sharded": false,
"tables": {
"t1": {}
}
}
`
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()

// Start topo server
clusterInstance.TopoFlavor = "consul"
if err := clusterInstance.StartTopo(); err != nil {
return 1
}

// Start keyspace
Keyspace := &cluster.Keyspace{
Name: KeyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
}
if err := clusterInstance.StartUnshardedKeyspace(*Keyspace, 0, false); err != nil {
log.Fatal(err.Error())
return 1
}

// Start vtgate
if err := clusterInstance.StartVtgate(); err != nil {
log.Fatal(err.Error())
return 1
}

return m.Run()
}()
os.Exit(exitCode)
}

func TestTopoDownServingQuery(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
vtParams := mysql.ConnParams{
Host: "localhost",
Port: clusterInstance.VtgateMySQLPort,
}
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer conn.Close()

defer exec(t, conn, `delete from t1`)

execMulti(t, conn, `insert into t1(c1, c2, c3, c4) values (300,100,300,'abc'); ;; insert into t1(c1, c2, c3, c4) values (301,101,301,'abcd');;`)
assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`)
clusterInstance.TopoProcess.TearDown(clusterInstance.Cell, clusterInstance.OriginalVTDATAROOT, clusterInstance.CurrentVTDATAROOT, true, *clusterInstance.TopoFlavorString())
time.Sleep(3 * time.Second)
assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`)
}

func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
require.NoError(t, err)
return qr
}

func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
qr, more, err := conn.ExecuteFetchMulti(query, 1000, true)
res = append(res, qr)
require.NoError(t, err)
for more == true {
qr, more, _, err = conn.ReadQueryResult(1000, true)
require.NoError(t, err)
res = append(res, qr)
}
return res
}

func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) {
t.Helper()
qr := exec(t, conn, query)
got := fmt.Sprintf("%v", qr.Rows)
diff := cmp.Diff(expected, got)
if diff != "" {
t.Errorf("Query: %s (-want +got):\n%s", query, diff)
}
}
147 changes: 147 additions & 0 deletions go/test/endtoend/topotest/etcd2/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ectd2

import (
"context"
"flag"
"fmt"
"os"
"testing"
"time"

"vitess.io/vitess/go/vt/log"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
)

var (
clusterInstance *cluster.LocalProcessCluster
cell = "zone1"
hostname = "localhost"
KeyspaceName = "customer"
SchemaSQL = `
CREATE TABLE t1 (
c1 BIGINT NOT NULL,
c2 BIGINT NOT NULL,
c3 BIGINT,
c4 varchar(100),
PRIMARY KEY (c1),
UNIQUE KEY (c2),
UNIQUE KEY (c3),
UNIQUE KEY (c4)
) ENGINE=Innodb;`
VSchema = `
{
"sharded": false,
"tables": {
"t1": {}
}
}
`
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()

// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return 1
}

// Start keyspace
Keyspace := &cluster.Keyspace{
Name: KeyspaceName,
SchemaSQL: SchemaSQL,
VSchema: VSchema,
}
if err := clusterInstance.StartUnshardedKeyspace(*Keyspace, 0, false); err != nil {
log.Fatal(err.Error())
return 1
}

// Start vtgate
if err := clusterInstance.StartVtgate(); err != nil {
log.Fatal(err.Error())
return 1
}

return m.Run()
}()
os.Exit(exitCode)
}

func TestTopoDownServingQuery(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
vtParams := mysql.ConnParams{
Host: "localhost",
Port: clusterInstance.VtgateMySQLPort,
}
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer conn.Close()

defer exec(t, conn, `delete from t1`)

execMulti(t, conn, `insert into t1(c1, c2, c3, c4) values (300,100,300,'abc'); ;; insert into t1(c1, c2, c3, c4) values (301,101,301,'abcd');;`)
assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`)
clusterInstance.TopoProcess.TearDown(clusterInstance.Cell, clusterInstance.OriginalVTDATAROOT, clusterInstance.CurrentVTDATAROOT, true, *clusterInstance.TopoFlavorString())
time.Sleep(3 * time.Second)
assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)] [INT64(301) INT64(101) INT64(301)]]`)
}

func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
require.NoError(t, err)
return qr
}

func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
qr, more, err := conn.ExecuteFetchMulti(query, 1000, true)
res = append(res, qr)
require.NoError(t, err)
for more == true {
qr, more, _, err = conn.ReadQueryResult(1000, true)
require.NoError(t, err)
res = append(res, qr)
}
return res
}

func assertMatches(t *testing.T, conn *mysql.Conn, query, expected string) {
t.Helper()
qr := exec(t, conn, query)
got := fmt.Sprintf("%v", qr.Rows)
diff := cmp.Diff(expected, got)
if diff != "" {
t.Errorf("Query: %s (-want +got):\n%s", query, diff)
}
}
Loading