Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vertical split testcase migrated in go #46

Merged
merged 6 commits into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go/test/endtoend/cellalias/cell_alias_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,11 @@ func TestAlias(t *testing.T) {

func waitTillAllTabletsAreHealthyInVtgate(t *testing.T, vtgateInstance cluster.VtgateProcess, shards ...string) {
for _, shard := range shards {
err := vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", keyspaceName, shard))
err := vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", keyspaceName, shard), 1)
assert.Nil(t, err)
err = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", keyspaceName, shard))
err = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", keyspaceName, shard), 1)
assert.Nil(t, err)
err = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", keyspaceName, shard))
err = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", keyspaceName, shard), 1)
assert.Nil(t, err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,10 @@ func (cluster *LocalProcessCluster) WaitForTabletsToHealthyInVtgate() (err error
for _, keyspace := range cluster.Keyspaces {
for _, shard := range keyspace.Shards {
isRdOnlyPresent = false
if err = cluster.VtgateProcess.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", keyspace.Name, shard.Name)); err != nil {
if err = cluster.VtgateProcess.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", keyspace.Name, shard.Name), 1); err != nil {
return err
}
if err = cluster.VtgateProcess.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", keyspace.Name, shard.Name)); err != nil {
if err = cluster.VtgateProcess.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", keyspace.Name, shard.Name), 1); err != nil {
return err
}
for _, tablet := range shard.Vttablets {
Expand All @@ -435,7 +435,7 @@ func (cluster *LocalProcessCluster) WaitForTabletsToHealthyInVtgate() (err error
}
}
if isRdOnlyPresent {
err = cluster.VtgateProcess.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", keyspace.Name, shard.Name))
err = cluster.VtgateProcess.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", keyspace.Name, shard.Name), 1)
}
if err != nil {
return err
Expand Down
32 changes: 27 additions & 5 deletions go/test/endtoend/cluster/vtgate_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os/exec"
"path"
"reflect"
"strconv"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -131,7 +132,8 @@ func (vtgate *VtgateProcess) WaitForStatus() bool {
}

// GetStatusForTabletOfShard function gets status for a specific tablet of a shard in keyspace
func (vtgate *VtgateProcess) GetStatusForTabletOfShard(name string) bool {
// endPointsCount : number of endpoints
func (vtgate *VtgateProcess) GetStatusForTabletOfShard(name string, endPointsCount int) bool {
resp, err := http.Get(vtgate.VerifyURL)
if err != nil {
return false
Expand All @@ -149,9 +151,9 @@ func (vtgate *VtgateProcess) GetStatusForTabletOfShard(name string) bool {
for _, key := range object.MapKeys() {
if key.String() == name {
value := fmt.Sprintf("%v", object.MapIndex(key))
return value == "1"
countStr := strconv.Itoa(endPointsCount)
return value == countStr
}

}
}
return masterConnectionExist
Expand All @@ -160,10 +162,11 @@ func (vtgate *VtgateProcess) GetStatusForTabletOfShard(name string) bool {
}

// WaitForStatusOfTabletInShard function waits till status of a tablet in shard is 1
func (vtgate *VtgateProcess) WaitForStatusOfTabletInShard(name string) error {
// endPointsCount: how many endpoints to wait for
func (vtgate *VtgateProcess) WaitForStatusOfTabletInShard(name string, endPointsCount int) error {
timeout := time.Now().Add(10 * time.Second)
for time.Now().Before(timeout) {
if vtgate.GetStatusForTabletOfShard(name) {
if vtgate.GetStatusForTabletOfShard(name, endPointsCount) {
return nil
}
select {
Expand Down Expand Up @@ -226,3 +229,22 @@ func VtgateProcessInstance(port int, grpcPort int, mySQLServerPort int, cell str

return vtgate
}

// GetVars returns map of vars
func (vtgate *VtgateProcess) GetVars() (map[string]interface{}, error) {
resultMap := make(map[string]interface{})
resp, err := http.Get(vtgate.VerifyURL)
if err != nil {
return nil, fmt.Errorf("error getting response from %s", vtgate.VerifyURL)
}
if resp.StatusCode == 200 {
respByte, _ := ioutil.ReadAll(resp.Body)
err := json.Unmarshal(respByte, &resultMap)
if err != nil {
return nil, fmt.Errorf("not able to parse response body")
}
return resultMap, nil
} else {
return nil, fmt.Errorf("unsuccessful response")
}
}
1 change: 1 addition & 0 deletions go/test/endtoend/mysqlctl/mysqlctl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func initCluster(shardNames []string, totalTabletsRequired int) {
clusterInstance.VtTabletExtraArgs,
clusterInstance.EnableSemiSync)
tablet.Alias = tablet.VttabletProcess.TabletPath

shard.Vttablets = append(shard.Vttablets, tablet)
}
for _, proc := range mysqlCtlProcessList {
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/sharded/sharded_keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func initCluster(shardNames []string, totalTabletsRequired int) {
clusterInstance.VtTabletExtraArgs,
clusterInstance.EnableSemiSync)
tablet.Alias = tablet.VttabletProcess.TabletPath

shard.Vttablets = append(shard.Vttablets, tablet)
}
for _, proc := range mysqlCtlProcessList {
Expand Down
17 changes: 11 additions & 6 deletions go/test/endtoend/sharding/base_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,18 +239,23 @@ func checkStreamHealthEqualsBinlogPlayerVars(t *testing.T, vttablet cluster.Vtta
}

// CheckBinlogServerVars checks the binlog server variables are correctly exported.
func CheckBinlogServerVars(t *testing.T, vttablet cluster.Vttablet, minStatement int, minTxn int) {
func CheckBinlogServerVars(t *testing.T, vttablet cluster.Vttablet, minStatement int, minTxn int, isVerticalSplit bool) {
resultMap := vttablet.VttabletProcess.GetVars()
assert.Contains(t, resultMap, "UpdateStreamKeyRangeStatements")
assert.Contains(t, resultMap, "UpdateStreamKeyRangeTransactions")
skey := "UpdateStreamKeyRangeStatements"
tkey := "UpdateStreamKeyRangeTransactions"
if isVerticalSplit {
skey = "UpdateStreamTablesStatements"
tkey = "UpdateStreamTablesTransactions"
}
assert.Contains(t, resultMap, skey)
assert.Contains(t, resultMap, tkey)
if minStatement > 0 {
value := fmt.Sprintf("%v", reflect.ValueOf(resultMap["UpdateStreamKeyRangeStatements"]))
value := fmt.Sprintf("%v", reflect.ValueOf(resultMap[skey]))
iValue, _ := strconv.Atoi(value)
assert.True(t, iValue >= minStatement, fmt.Sprintf("only got %d < %d statements", iValue, minStatement))
}

if minTxn > 0 {
value := fmt.Sprintf("%v", reflect.ValueOf(resultMap["UpdateStreamKeyRangeStatements"]))
value := fmt.Sprintf("%v", reflect.ValueOf(resultMap[tkey]))
iValue, _ := strconv.Atoi(value)
assert.True(t, iValue >= minTxn, fmt.Sprintf("only got %d < %d transactions", iValue, minTxn))
}
Expand Down
17 changes: 8 additions & 9 deletions go/test/endtoend/sharding/initialsharding/sharding_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func TestInitialSharding(t *testing.T, keyspace *cluster.Keyspace, keyType query
assert.Nil(t, err)

for _, tabletType := range []string{"master", "replica", "rdonly"} {
if err = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.%s", keyspaceName, shard1.Name, tabletType)); err != nil {
if err = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.%s", keyspaceName, shard1.Name, tabletType), 1); err != nil {
assert.Fail(t, err.Error())
}
}
Expand Down Expand Up @@ -400,12 +400,11 @@ func TestInitialSharding(t *testing.T, keyspace *cluster.Keyspace, keyType query

// Wait for the endpoints, either local or remote.
for _, shard := range []cluster.Shard{shard1, shard21, shard22} {

err = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", keyspaceName, shard.Name))
err = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", keyspaceName, shard.Name), 1)
assert.Nil(t, err)
err = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", keyspaceName, shard.Name))
err = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", keyspaceName, shard.Name), 1)
assert.Nil(t, err)
err = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", keyspaceName, shard.Name))
err = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", keyspaceName, shard.Name), 1)
assert.Nil(t, err)
}

Expand Down Expand Up @@ -493,7 +492,7 @@ func TestInitialSharding(t *testing.T, keyspace *cluster.Keyspace, keyType query
sharding.CheckDestinationMaster(t, *shard22.MasterTablet(), []string{shard1Ks}, *ClusterInstance)

// check that binlog server exported the stats vars
sharding.CheckBinlogServerVars(t, *shard1.Replica(), 0, 0)
sharding.CheckBinlogServerVars(t, *shard1.Replica(), 0, 0, false)

for _, tablet := range []cluster.Vttablet{*shard21.Rdonly(), *shard22.Rdonly()} {
err = ClusterInstance.VtctlclientProcess.ExecuteCommand("RunHealthCheck", tablet.Alias)
Expand All @@ -510,7 +509,7 @@ func TestInitialSharding(t *testing.T, keyspace *cluster.Keyspace, keyType query

sharding.CheckDestinationMaster(t, *shard21.MasterTablet(), []string{shard1Ks}, *ClusterInstance)
sharding.CheckDestinationMaster(t, *shard22.MasterTablet(), []string{shard1Ks}, *ClusterInstance)
sharding.CheckBinlogServerVars(t, *shard1.Replica(), 1000, 1000)
sharding.CheckBinlogServerVars(t, *shard1.Replica(), 1000, 1000, false)

err = ClusterInstance.VtctlclientProcess.ExecuteCommand("RunHealthCheck", shard21.Rdonly().Alias)
assert.Nil(t, err)
Expand Down Expand Up @@ -558,8 +557,8 @@ func TestInitialSharding(t *testing.T, keyspace *cluster.Keyspace, keyType query
_ = shard21.Rdonly().VttabletProcess.WaitForTabletType("SERVING")
_ = shard22.Rdonly().VttabletProcess.WaitForTabletType("SERVING")

_ = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", keyspaceName, shard21.Name))
_ = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", keyspaceName, shard22.Name))
_ = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", keyspaceName, shard21.Name), 1)
_ = vtgateInstance.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.rdonly", keyspaceName, shard22.Name), 1)

//then serve replica from the split shards

Expand Down
8 changes: 5 additions & 3 deletions go/test/endtoend/sharding/resharding/resharding_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ func TestResharding(t *testing.T, useVarbinaryShardingKeyType bool) {
assert.Nil(t, err)

// check that binlog server exported the stats vars
sharding.CheckBinlogServerVars(t, *shard1Replica1, 0, 0)
sharding.CheckBinlogServerVars(t, *shard1Replica1, 0, 0, false)

// Check that the throttler was enabled.
// The stream id is hard-coded as 1, which is the first id generated through auto-inc.
Expand All @@ -554,7 +554,8 @@ func TestResharding(t *testing.T, useVarbinaryShardingKeyType bool) {
checkMultiShardValues(t, keyspaceName, shardingKeyType)
sharding.CheckBinlogPlayerVars(t, *shard2Master, []string{shard1Ks}, 30)
sharding.CheckBinlogPlayerVars(t, *shard3Master, []string{shard1Ks}, 30)
sharding.CheckBinlogServerVars(t, *shard1Replica1, 100, 100)

sharding.CheckBinlogServerVars(t, *shard1Replica1, 100, 100, false)

// use vtworker to compare the data (after health-checking the destination
// rdonly tablets so discovery works)
Expand Down Expand Up @@ -613,7 +614,8 @@ func TestResharding(t *testing.T, useVarbinaryShardingKeyType bool) {
insertLots(100, 100, *shard1Master, tableName, fixedParentID, keyspaceName)
log.Debug("Checking 100 percent of data was sent quickly")
assert.True(t, checkLotsTimeout(t, 100, 100, tableName, keyspaceName, shardingKeyType))
sharding.CheckBinlogServerVars(t, *shard1Replica2, 80, 80)

sharding.CheckBinlogServerVars(t, *shard1Replica2, 80, 80, false)

// check we can't migrate the master just yet
err = clusterInstance.VtctlclientProcess.ExecuteCommand("MigrateServedTypes", shard1Ks, "master")
Expand Down
Loading