diff --git a/.github/ISSUE_TEMPLATE/question.md b/.github/ISSUE_TEMPLATE/question.md index b338743733e..57c62f5ea1a 100644 --- a/.github/ISSUE_TEMPLATE/question.md +++ b/.github/ISSUE_TEMPLATE/question.md @@ -6,4 +6,4 @@ about: If you have a question, please check out our other community resources in Issues on GitHub are intended to be related to bugs or feature requests, so we recommend using our other community resources instead of asking here. - [Vitess User Guide](https://vitess.io/user-guide/introduction/) -- Any other questions can be asked in the community [Slack workspace](https://bit.ly/vitess-slack) +- Any other questions can be asked in the community [Slack workspace](https://join.slack.com/t/vitess/shared_invite/enQtMzIxMDMyMzA0NzA1LTYxMjk2M2M2NjAwNGY0ODljY2E1MjBlZjRkMmZmNDVkZTBhNDUxNzNkOGM4YmEzNWEwOTE2NjJiY2QyZjZjYTE) diff --git a/.github/workflows/e2e-test-cluster.yml b/.github/workflows/e2e-test-cluster.yml new file mode 100644 index 00000000000..969008305ef --- /dev/null +++ b/.github/workflows/e2e-test-cluster.yml @@ -0,0 +1,41 @@ +name: e2e Test Cluster +on: [push, pull_request] +jobs: + + build: + name: Build + runs-on: ubuntu-latest + steps: + + - name: Set up Go + uses: actions/setup-go@v1 + with: + go-version: 1.13 + + - name: Check out code + uses: actions/checkout@v1 + + - name: Get dependencies + run: | + sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget + sudo service mysql stop + sudo service etcd stop + sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/ + sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld + go mod download + + - name: Run bootstrap.sh + run: | + echo "Copying new bootstrap over location of legacy one." + cp .github/bootstrap.sh . + ./bootstrap.sh + + - name: Build + run: | + GOBIN=$PWD/bin make build + + - name: Run e2e test cluster + run: | + export PATH=$PWD/bin:$PATH + source ./dev.env + VTDATAROOT=/tmp/vtdataroot VTTOP=$PWD VTROOT=$PWD tools/e2e_test_cluster.sh diff --git a/Makefile b/Makefile index d0581272a65..24eab88c988 100644 --- a/Makefile +++ b/Makefile @@ -104,6 +104,9 @@ unit_test_race: build e2e_test_race: build tools/e2e_test_race.sh +e2e_test_cluster: build + tools/e2e_test_cluster.sh + .ONESHELL: SHELL = /bin/bash diff --git a/README.md b/README.md index f8b14b0c126..8590441e8c6 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ Ask questions in the discussion forum. For topics that are better discussed live, please join the -[Vitess Slack](https://bit.ly/vitess-slack) workspace. +[Vitess Slack](https://join.slack.com/t/vitess/shared_invite/enQtMzIxMDMyMzA0NzA1LTYxMjk2M2M2NjAwNGY0ODljY2E1MjBlZjRkMmZmNDVkZTBhNDUxNzNkOGM4YmEzNWEwOTE2NjJiY2QyZjZjYTE) workspace. Subscribe to [vitess-announce@googlegroups.com](https://groups.google.com/forum/#!forum/vitess-announce) diff --git a/config/mycnf/master_mariadb104.cnf b/config/mycnf/master_mariadb104.cnf new file mode 100644 index 00000000000..a144f352561 --- /dev/null +++ b/config/mycnf/master_mariadb104.cnf @@ -0,0 +1,23 @@ +# This file is auto-included when MariaDB 10.4 is detected. + +# enable strict mode so it's safe to compare sequence numbers across different server IDs. +gtid_strict_mode = 1 +innodb_stats_persistent = 0 + +# Semi-sync replication is required for automated unplanned failover +# (when the master goes away). Here we just load the plugin so it's +# available if desired, but it's disabled at startup. +# +# If the -enable_semi_sync flag is used, VTTablet will enable semi-sync +# at the proper time when replication is set up, or when masters are +# promoted or demoted. + +# semi_sync has been merged into master as of mariadb 10.3 so this is no longer needed +#plugin-load = rpl_semi_sync_master=semisync_master.so;rpl_semi_sync_slave=semisync_slave.so + +# When semi-sync is enabled, don't allow fallback to async +# if you get no ack, or have no slaves. This is necessary to +# prevent alternate futures when doing a failover in response to +# a master that becomes unresponsive. +rpl_semi_sync_master_timeout = 1000000000000000000 +rpl_semi_sync_master_wait_no_slave = 1 diff --git a/dev.env b/dev.env index 1c0d1ebdd59..54dd9cb41b9 100644 --- a/dev.env +++ b/dev.env @@ -77,6 +77,12 @@ fi PKG_CONFIG_PATH=$(prepend_path "$PKG_CONFIG_PATH" "$VTROOT/lib") export PKG_CONFIG_PATH +# According to https://github.com/etcd-io/etcd/blob/a621d807f061e1dd635033a8d6bc261461429e27/Documentation/op-guide/supported-platform.md, +# currently, etcd is unstable on arm64, so ETCD_UNSUPPORTED_ARCH should be set. +if [ "$(arch)" == aarch64 ]; then + export ETCD_UNSUPPORTED_ARCH=arm64 +fi + # Useful aliases. Remove if inconvenient. alias gt='cd $GOTOP' alias pt='cd $PYTOP' diff --git a/doc/V3HighLevelDesign.md b/doc/V3HighLevelDesign.md index b0ba3268a7b..5becdb0d46b 100644 --- a/doc/V3HighLevelDesign.md +++ b/doc/V3HighLevelDesign.md @@ -782,7 +782,7 @@ Recapitulating what we’ve covered so far: Once we start allowing joins and subqueries, we have a whole bunch of table aliases and relationships to deal with. We have to contend with name clashes, self-joins, as well as scoping rules. In a way, the vschema has acted as a static symbol table so far. But that’s not going to be enough any more. -The core of the symbol table will contain a map whose key will be a table alias, and the elements will be [similar to the table in vschema](https://github.com/vitessio/vitess/blob/master/go/vt/vtgate/planbuilder/schema.go#L22). However, it will also contain a column list that will be built as the query is parsed. +The core of the symbol table will contain a map whose key will be a table alias, and the elements will be [similar to the table in vschema](https://github.com/vitessio/vitess/blob/0b3de7c4a2de8daec545f040639b55a835361685/go/vt/vtgate/vindexes/vschema.go#L82). However, it will also contain a column list that will be built as the query is parsed. ### A simple example diff --git a/doc/VitessQueues.md b/doc/VitessQueues.md index 20a2b951669..6b0d02823d8 100644 --- a/doc/VitessQueues.md +++ b/doc/VitessQueues.md @@ -79,7 +79,7 @@ capabilities, the usual horizontal sharding process can be used. Queue Tables are marked in the schema by a comment, in a similar way we detect Sequence Tables -[now](https://github.com/vitessio/vitess/blob/master/go/vt/tabletserver/table_info.go#L37). +[now](https://github.com/vitessio/vitess/blob/0b3de7c4a2de8daec545f040639b55a835361685/go/vt/vttablet/tabletserver/tabletserver.go#L138). When a tablet becomes a master, and there are Queue tables, it creates a QueueManager for each of them. diff --git a/docker/lite/Dockerfile b/docker/lite/Dockerfile index 268a68fffce..55ca5f969f9 100644 --- a/docker/lite/Dockerfile +++ b/docker/lite/Dockerfile @@ -40,6 +40,13 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-ins gnupg dirmngr ca-certificates wget libdbd-mysql-perl rsync libaio1 libatomic1 libcurl3 libev4 \ && for i in $(seq 1 10); do apt-key adv --no-tty --recv-keys --keyserver keyserver.ubuntu.com 8C718D3B5072E1F5 && break; done \ && echo 'deb http://repo.mysql.com/apt/debian/ stretch mysql-5.7' > /etc/apt/sources.list.d/mysql.list \ + && for i in $(seq 1 10); do apt-key adv --no-tty --keyserver keys.gnupg.net --recv-keys 9334A25F8507EFA5 && break; done \ + && echo 'deb http://repo.percona.com/apt stretch main' > /etc/apt/sources.list.d/percona.list && \ + { \ + echo debconf debconf/frontend select Noninteractive; \ + echo percona-server-server-5.7 percona-server-server/root_password password 'unused'; \ + echo percona-server-server-5.7 percona-server-server/root_password_again password 'unused'; \ + } | debconf-set-selections \ && apt-get update \ && DEBIAN_FRONTEND=noninteractive \ apt-get install -y --no-install-recommends \ @@ -47,9 +54,9 @@ RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-ins libmysqlclient20 \ mysql-client \ mysql-server \ - && wget https://www.percona.com/downloads/XtraBackup/Percona-XtraBackup-2.4.13/binary/debian/stretch/x86_64/percona-xtrabackup-24_2.4.13-1.stretch_amd64.deb \ - && dpkg -i percona-xtrabackup-24_2.4.13-1.stretch_amd64.deb \ - && rm -f percona-xtrabackup-24_2.4.13-1.stretch_amd64.deb \ + libjemalloc1 \ + libtcmalloc-minimal4 \ + percona-xtrabackup-24 \ && rm -rf /var/lib/apt/lists/* \ && groupadd -r vitess && useradd -r -g vitess vitess diff --git a/examples/compose/README.md b/examples/compose/README.md index eb801a8fcbe..12f81422d64 100644 --- a/examples/compose/README.md +++ b/examples/compose/README.md @@ -95,13 +95,18 @@ vitess/examples/compose$ ./client.sh ### Connect to vgate and run queries vtgate responds to the MySQL protocol, so we can connect to it using the default MySQL client command line. -You can also use the `./lmysql.sh` helper script. ``` vitess/examples/compose$ mysql --port=15306 --host=127.0.0.1 -vitess/examples/compose$ ./lmysql.sh --port=15306 --host=127.0.0.1 ``` **Note that you may need to replace `127.0.0.1` with `docker ip` or `docker-machine ip`** +You can also use the `./lmysql.sh` helper script. +``` +vitess/examples/compose$ ./lmysql.sh --port=15306 --host= +``` + +where `` is `docker-machine ip` or external docker host ip addr + ### Play around with vtctl commands ``` diff --git a/examples/compose/lmysql.sh b/examples/compose/lmysql.sh old mode 100644 new mode 100755 diff --git a/go.mod b/go.mod index ae3b4d9aee4..6707d58528a 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 // indirect github.com/aws/aws-sdk-go v0.0.0-20180223184012-ebef4262e06a github.com/boltdb/bolt v1.3.1 // indirect + github.com/cespare/xxhash/v2 v2.1.1 github.com/cockroachdb/cmux v0.0.0-20170110192607-30d10be49292 // indirect github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/coreos/etcd v0.0.0-20170626015032-703663d1f6ed diff --git a/go.sum b/go.sum index ff202019b03..df4e4fb847c 100644 --- a/go.sum +++ b/go.sum @@ -29,6 +29,9 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= diff --git a/go/pools/resource_pool.go b/go/pools/resource_pool.go index 81055e4d0cb..63b8309137b 100644 --- a/go/pools/resource_pool.go +++ b/go/pools/resource_pool.go @@ -38,6 +38,9 @@ var ( // ErrTimeout is returned if a resource get times out. ErrTimeout = errors.New("resource pool timed out") + // ErrCtxTimeout is returned if a ctx is already expired by the time the resource pool is used + ErrCtxTimeout = errors.New("resource pool context already expired") + prefillTimeout = 30 * time.Second ) @@ -198,7 +201,7 @@ func (rp *ResourcePool) get(ctx context.Context) (resource Resource, err error) // If ctx has already expired, avoid racing with rp's resource channel. select { case <-ctx.Done(): - return nil, ErrTimeout + return nil, ErrCtxTimeout default: } diff --git a/go/pools/resource_pool_test.go b/go/pools/resource_pool_test.go index 2ec6f68c8b6..f3950e5e23e 100644 --- a/go/pools/resource_pool_test.go +++ b/go/pools/resource_pool_test.go @@ -639,7 +639,7 @@ func TestExpired(t *testing.T) { p.Put(r) } cancel() - want := "resource pool timed out" + want := "resource pool context already expired" if err == nil || err.Error() != want { t.Errorf("got %v, want %s", err, want) } diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go new file mode 100644 index 00000000000..780917918ce --- /dev/null +++ b/go/test/endtoend/cluster/cluster_process.go @@ -0,0 +1,340 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "flag" + "fmt" + "math/rand" + "os" + "path" + "time" + + "vitess.io/vitess/go/vt/log" +) + +// DefaultCell : If no cell name is passed, then use following +const DefaultCell = "zone1" + +var ( + keepData = flag.Bool("keep-data", false, "don't delete the per-test VTDATAROOT subfolders") +) + +// LocalProcessCluster Testcases need to use this to iniate a cluster +type LocalProcessCluster struct { + Keyspaces []Keyspace + Cell string + BaseTabletUID int + Hostname string + TopoPort int + TmpDirectory string + OriginalVTDATAROOT string + CurrentVTDATAROOT string + + VtgateMySQLPort int + VtgateGrpcPort int + VtctldHTTPPort int + + // standalone executable + VtctlclientProcess VtctlClientProcess + VtctlProcess VtctlProcess + + // background executable processes + topoProcess EtcdProcess + vtctldProcess VtctldProcess + VtgateProcess VtgateProcess + + nextPortForProcess int + + //Extra arguments for vtTablet + VtTabletExtraArgs []string + + //Extra arguments for vtGate + VtGateExtraArgs []string +} + +// Keyspace : Cluster accepts keyspace to launch it +type Keyspace struct { + Name string + SchemaSQL string + VSchema string + Shards []Shard +} + +// Shard with associated vttablets +type Shard struct { + Name string + Vttablets []Vttablet +} + +// Vttablet stores the properties needed to start a vttablet process +type Vttablet struct { + Type string + TabletUID int + HTTPPort int + GrpcPort int + MySQLPort int + + // background executable processes + mysqlctlProcess MysqlctlProcess + vttabletProcess VttabletProcess +} + +// StartTopo starts topology server +func (cluster *LocalProcessCluster) StartTopo() (err error) { + if cluster.Cell == "" { + cluster.Cell = DefaultCell + } + cluster.TopoPort = cluster.GetAndReservePort() + cluster.TmpDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/tmp_%d", cluster.GetAndReservePort())) + cluster.topoProcess = *EtcdProcessInstance(cluster.TopoPort, cluster.GetAndReservePort(), cluster.Hostname, "global") + log.Info(fmt.Sprintf("Starting etcd server on port : %d", cluster.TopoPort)) + if err = cluster.topoProcess.Setup(); err != nil { + log.Error(err.Error()) + return + } + + log.Info("Creating topo dirs") + if err = cluster.topoProcess.ManageTopoDir("mkdir", "/vitess/global"); err != nil { + log.Error(err.Error()) + return + } + + if err = cluster.topoProcess.ManageTopoDir("mkdir", "/vitess/"+cluster.Cell); err != nil { + log.Error(err.Error()) + return + } + + log.Info("Adding cell info") + cluster.VtctlProcess = *VtctlProcessInstance(cluster.topoProcess.Port, cluster.Hostname) + if err = cluster.VtctlProcess.AddCellInfo(cluster.Cell); err != nil { + log.Error(err) + return + } + + cluster.vtctldProcess = *VtctldProcessInstance(cluster.GetAndReservePort(), cluster.GetAndReservePort(), cluster.topoProcess.Port, cluster.Hostname, cluster.TmpDirectory) + log.Info(fmt.Sprintf("Starting vtctld server on port : %d", cluster.vtctldProcess.Port)) + cluster.VtctldHTTPPort = cluster.vtctldProcess.Port + if err = cluster.vtctldProcess.Setup(cluster.Cell); err != nil { + log.Error(err.Error()) + return + } + + cluster.VtctlclientProcess = *VtctlClientProcessInstance("localhost", cluster.vtctldProcess.GrpcPort, cluster.TmpDirectory) + return +} + +// StartUnshardedKeyspace starts unshared keyspace with shard name as "0" +func (cluster *LocalProcessCluster) StartUnshardedKeyspace(keyspace Keyspace, replicaCount int, rdonly bool) error { + return cluster.StartKeyspace(keyspace, []string{"0"}, replicaCount, rdonly) +} + +// StartKeyspace starts required number of shard and the corresponding tablets +// keyspace : struct containing keyspace name, Sqlschema to apply, VSchema to apply +// shardName : list of shard names +// replicaCount: total number of replicas excluding master and rdonly +// rdonly: whether readonly tablets needed +func (cluster *LocalProcessCluster) StartKeyspace(keyspace Keyspace, shardNames []string, replicaCount int, rdonly bool) (err error) { + totalTabletsRequired := replicaCount + 1 // + 1 is for master + if rdonly { + totalTabletsRequired = totalTabletsRequired + 1 // + 1 for rdonly + } + shards := make([]Shard, 0) + log.Info("Starting keyspace : " + keyspace.Name) + _ = cluster.VtctlProcess.CreateKeyspace(keyspace.Name) + for _, shardName := range shardNames { + shard := &Shard{ + Name: shardName, + } + log.Info("Starting shard : " + shardName) + for i := 0; i < totalTabletsRequired; i++ { + // instantiate vttable object with reserved ports + tablet := &Vttablet{ + TabletUID: cluster.GetAndReserveTabletUID(), + HTTPPort: cluster.GetAndReservePort(), + GrpcPort: cluster.GetAndReservePort(), + MySQLPort: cluster.GetAndReservePort(), + } + if i == 0 { // Make the first one as master + tablet.Type = "master" + } else if i == totalTabletsRequired-1 && rdonly { // Make the last one as rdonly if rdonly flag is passed + tablet.Type = "rdonly" + } + // Start Mysqlctl process + log.Info(fmt.Sprintf("Starting mysqlctl for table uid %d, mysql port %d", tablet.TabletUID, tablet.MySQLPort)) + tablet.mysqlctlProcess = *MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, cluster.TmpDirectory) + if err = tablet.mysqlctlProcess.Start(); err != nil { + log.Error(err.Error()) + return + } + + // start vttablet process + tablet.vttabletProcess = *VttabletProcessInstance(tablet.HTTPPort, + tablet.GrpcPort, + tablet.TabletUID, + cluster.Cell, + shardName, + keyspace.Name, + cluster.vtctldProcess.Port, + tablet.Type, + cluster.topoProcess.Port, + cluster.Hostname, + cluster.TmpDirectory, + cluster.VtTabletExtraArgs) + log.Info(fmt.Sprintf("Starting vttablet for tablet uid %d, grpc port %d", tablet.TabletUID, tablet.GrpcPort)) + + if err = tablet.vttabletProcess.Setup(); err != nil { + log.Error(err.Error()) + return + } + + shard.Vttablets = append(shard.Vttablets, *tablet) + } + + // Make first tablet as master + if err = cluster.VtctlclientProcess.InitShardMaster(keyspace.Name, shardName, cluster.Cell, shard.Vttablets[0].TabletUID); err != nil { + log.Error(err.Error()) + return + } + + shards = append(shards, *shard) + } + keyspace.Shards = shards + cluster.Keyspaces = append(cluster.Keyspaces, keyspace) + + // Apply Schema SQL + if err = cluster.VtctlclientProcess.ApplySchema(keyspace.Name, keyspace.SchemaSQL); err != nil { + log.Error(err.Error()) + return + } + + //Apply VSchema + if keyspace.VSchema != "" { + if err = cluster.VtctlclientProcess.ApplyVSchema(keyspace.Name, keyspace.VSchema); err != nil { + log.Error(err.Error()) + return + } + } + + log.Info("Done creating keyspace : " + keyspace.Name) + return +} + +// StartVtgate starts vtgate +func (cluster *LocalProcessCluster) StartVtgate() (err error) { + vtgateHTTPPort := cluster.GetAndReservePort() + vtgateGrpcPort := cluster.GetAndReservePort() + cluster.VtgateMySQLPort = cluster.GetAndReservePort() + log.Info(fmt.Sprintf("Starting vtgate on port %d", vtgateHTTPPort)) + cluster.VtgateProcess = *VtgateProcessInstance( + vtgateHTTPPort, + vtgateGrpcPort, + cluster.VtgateMySQLPort, + cluster.Cell, + cluster.Cell, + cluster.Hostname, + "MASTER,REPLICA", + cluster.topoProcess.Port, + cluster.TmpDirectory, + cluster.VtGateExtraArgs) + + log.Info(fmt.Sprintf("Vtgate started, connect to mysql using : mysql -h 127.0.0.1 -P %d", cluster.VtgateMySQLPort)) + return cluster.VtgateProcess.Setup() +} + +// NewCluster instantiates a new cluster +func NewCluster(cell string, hostname string) *LocalProcessCluster { + cluster := &LocalProcessCluster{Cell: cell, Hostname: hostname} + cluster.OriginalVTDATAROOT = os.Getenv("VTDATAROOT") + cluster.CurrentVTDATAROOT = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("vtroot_%d", cluster.GetAndReservePort())) + _ = createDirectory(cluster.CurrentVTDATAROOT, 0700) + _ = os.Setenv("VTDATAROOT", cluster.CurrentVTDATAROOT) + rand.Seed(time.Now().UTC().UnixNano()) + return cluster +} + +// ReStartVtgate starts vtgate with updated configs +func (cluster *LocalProcessCluster) ReStartVtgate() (err error) { + err = cluster.VtgateProcess.TearDown() + if err != nil { + log.Error(err.Error()) + return + } + err = cluster.StartVtgate() + if err != nil { + log.Error(err.Error()) + return + } + return err +} + +// Teardown brings down the cluster by invoking teardown for individual processes +func (cluster *LocalProcessCluster) Teardown() (err error) { + if err = cluster.VtgateProcess.TearDown(); err != nil { + log.Error(err.Error()) + return + } + + for _, keyspace := range cluster.Keyspaces { + for _, shard := range keyspace.Shards { + for _, tablet := range shard.Vttablets { + if err = tablet.mysqlctlProcess.Stop(); err != nil { + log.Error(err.Error()) + return + } + + if err = tablet.vttabletProcess.TearDown(); err != nil { + log.Error(err.Error()) + return + } + } + } + } + + if err = cluster.vtctldProcess.TearDown(); err != nil { + log.Error(err.Error()) + return + } + + if err = cluster.topoProcess.TearDown(cluster.Cell, cluster.OriginalVTDATAROOT, cluster.CurrentVTDATAROOT, *keepData); err != nil { + log.Error(err.Error()) + return + } + return err +} + +// GetAndReservePort gives port for required process +func (cluster *LocalProcessCluster) GetAndReservePort() int { + if cluster.nextPortForProcess == 0 { + cluster.nextPortForProcess = getRandomNumber(20000, 15000) + } + cluster.nextPortForProcess = cluster.nextPortForProcess + 1 + return cluster.nextPortForProcess +} + +// GetAndReserveTabletUID gives tablet uid +func (cluster *LocalProcessCluster) GetAndReserveTabletUID() int { + if cluster.BaseTabletUID == 0 { + cluster.BaseTabletUID = getRandomNumber(10000, 0) + } + cluster.BaseTabletUID = cluster.BaseTabletUID + 1 + return cluster.BaseTabletUID +} + +func getRandomNumber(maxNumber int32, baseNumber int) int { + return int(rand.Int31n(maxNumber)) + baseNumber +} diff --git a/go/test/endtoend/cluster/etcd_process.go b/go/test/endtoend/cluster/etcd_process.go new file mode 100644 index 00000000000..284a849d3ca --- /dev/null +++ b/go/test/endtoend/cluster/etcd_process.go @@ -0,0 +1,178 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "fmt" + "net/http" + "os" + "os/exec" + "path" + "strings" + "syscall" + "time" + + "vitess.io/vitess/go/vt/log" +) + +// EtcdProcess is a generic handle for a running Etcd . +// It can be spawned manually +type EtcdProcess struct { + Name string + Binary string + DataDirectory string + ListenClientURL string + AdvertiseClientURL string + Port int + PeerPort int + Host string + VerifyURL string + PeerURL string + + proc *exec.Cmd + exit chan error +} + +// Setup spawns a new etcd service and initializes it with the defaults. +// The service is kept running in the background until TearDown() is called. +func (etcd *EtcdProcess) Setup() (err error) { + etcd.proc = exec.Command( + etcd.Binary, + "--name", etcd.Name, + "--data-dir", etcd.DataDirectory, + "--listen-client-urls", etcd.ListenClientURL, + "--advertise-client-urls", etcd.AdvertiseClientURL, + "--initial-advertise-peer-urls", etcd.PeerURL, + "--listen-peer-urls", etcd.PeerURL, + "--initial-cluster", fmt.Sprintf("%s=%s", etcd.Name, etcd.PeerURL), + ) + + etcd.proc.Stderr = os.Stderr + etcd.proc.Stdout = os.Stdout + + etcd.proc.Env = append(etcd.proc.Env, os.Environ()...) + + log.Infof("%v %v", strings.Join(etcd.proc.Args, " ")) + println("Starting etcd with args " + strings.Join(etcd.proc.Args, " ")) + err = etcd.proc.Start() + if err != nil { + return + } + + etcd.exit = make(chan error) + go func() { + etcd.exit <- etcd.proc.Wait() + }() + + timeout := time.Now().Add(60 * time.Second) + for time.Now().Before(timeout) { + if etcd.IsHealthy() { + return + } + select { + case err := <-etcd.exit: + return fmt.Errorf("process '%s' exited prematurely (err: %s)", etcd.Binary, err) + default: + time.Sleep(300 * time.Millisecond) + } + } + + return fmt.Errorf("process '%s' timed out after 60s (err: %s)", etcd.Binary, <-etcd.exit) +} + +// TearDown shutdowns the running mysqld service +func (etcd *EtcdProcess) TearDown(Cell string, originalVtRoot string, currentRoot string, keepdata bool) error { + if etcd.proc == nil || etcd.exit == nil { + return nil + } + + etcd.removeTopoDirectories(Cell) + + // Attempt graceful shutdown with SIGTERM first + _ = etcd.proc.Process.Signal(syscall.SIGTERM) + if !*keepData { + _ = os.RemoveAll(etcd.DataDirectory) + _ = os.RemoveAll(currentRoot) + } + _ = os.Setenv("VTDATAROOT", originalVtRoot) + select { + case err := <-etcd.exit: + etcd.proc = nil + return err + + case <-time.After(10 * time.Second): + etcd.proc.Process.Kill() + etcd.proc = nil + return <-etcd.exit + } + +} + +// IsHealthy function checks if etcd server is up and running +func (etcd *EtcdProcess) IsHealthy() bool { + resp, err := http.Get(etcd.VerifyURL) + if err != nil { + return false + } + if resp.StatusCode == 200 { + return true + } + return false +} + +func (etcd *EtcdProcess) removeTopoDirectories(Cell string) { + _ = etcd.ManageTopoDir("rmdir", "/vitess/global") + _ = etcd.ManageTopoDir("rmdir", "/vitess/"+Cell) +} + +// ManageTopoDir creates global and zone in etcd2 +func (etcd *EtcdProcess) ManageTopoDir(command string, directory string) (err error) { + url := etcd.VerifyURL + directory + payload := strings.NewReader(`{"dir":"true"}`) + if command == "mkdir" { + req, _ := http.NewRequest("PUT", url, payload) + req.Header.Add("content-type", "application/json") + _, err = http.DefaultClient.Do(req) + return err + } else if command == "rmdir" { + req, _ := http.NewRequest("DELETE", url+"?dir=true", payload) + _, err = http.DefaultClient.Do(req) + return err + } else { + return nil + } +} + +// EtcdProcessInstance returns a EtcdProcess handle for a etcd sevice, +// configured with the given Config. +// The process must be manually started by calling setup() +func EtcdProcessInstance(port int, peerPort int, hostname string, name string) *EtcdProcess { + etcd := &EtcdProcess{ + Name: name, + Binary: "etcd", + Port: port, + Host: hostname, + PeerPort: peerPort, + } + + etcd.AdvertiseClientURL = fmt.Sprintf("http://%s:%d", etcd.Host, etcd.Port) + etcd.ListenClientURL = fmt.Sprintf("http://%s:%d", etcd.Host, etcd.Port) + etcd.DataDirectory = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("%s_%d", "etcd", port)) + etcd.VerifyURL = fmt.Sprintf("http://%s:%d/v2/keys", etcd.Host, etcd.Port) + etcd.PeerURL = fmt.Sprintf("http://%s:%d", hostname, peerPort) + return etcd +} diff --git a/go/test/endtoend/cluster/mysqlctl_process.go b/go/test/endtoend/cluster/mysqlctl_process.go new file mode 100644 index 00000000000..baec38d391d --- /dev/null +++ b/go/test/endtoend/cluster/mysqlctl_process.go @@ -0,0 +1,85 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "fmt" + "os" + "os/exec" + "path" +) + +// MysqlctlProcess is a generic handle for a running mysqlctl command . +// It can be spawned manually +type MysqlctlProcess struct { + Name string + Binary string + LogDirectory string + TabletUID int + MySQLPort int + InitDBFile string +} + +// InitDb executes mysqlctl command to add cell info +func (mysqlctl *MysqlctlProcess) InitDb() (err error) { + tmpProcess := exec.Command( + mysqlctl.Binary, + "-log_dir", mysqlctl.LogDirectory, + "-tablet_uid", fmt.Sprintf("%d", mysqlctl.TabletUID), + "-mysql_port", fmt.Sprintf("%d", mysqlctl.MySQLPort), + "init", + "-init_db_sql_file", mysqlctl.InitDBFile, + ) + return tmpProcess.Run() +} + +// Start executes mysqlctl command to start mysql instance +func (mysqlctl *MysqlctlProcess) Start() (err error) { + tmpProcess := exec.Command( + mysqlctl.Binary, + "-log_dir", mysqlctl.LogDirectory, + "-tablet_uid", fmt.Sprintf("%d", mysqlctl.TabletUID), + "-mysql_port", fmt.Sprintf("%d", mysqlctl.MySQLPort), + "init", + "-init_db_sql_file", mysqlctl.InitDBFile, + ) + return tmpProcess.Run() +} + +// Stop executes mysqlctl command to stop mysql instance +func (mysqlctl *MysqlctlProcess) Stop() (err error) { + tmpProcess := exec.Command( + mysqlctl.Binary, + "-tablet_uid", fmt.Sprintf("%d", mysqlctl.TabletUID), + "shutdown", + ) + return tmpProcess.Start() +} + +// MysqlCtlProcessInstance returns a Mysqlctl handle for mysqlctl process +// configured with the given Config. +func MysqlCtlProcessInstance(tabletUID int, mySQLPort int, tmpDirectory string) *MysqlctlProcess { + mysqlctl := &MysqlctlProcess{ + Name: "mysqlctl", + Binary: "mysqlctl", + LogDirectory: tmpDirectory, + InitDBFile: path.Join(os.Getenv("VTROOT"), "/config/init_db.sql"), + } + mysqlctl.MySQLPort = mySQLPort + mysqlctl.TabletUID = tabletUID + return mysqlctl +} diff --git a/go/test/endtoend/cluster/vtctl_process.go b/go/test/endtoend/cluster/vtctl_process.go new file mode 100644 index 00000000000..74511b5c70f --- /dev/null +++ b/go/test/endtoend/cluster/vtctl_process.go @@ -0,0 +1,79 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "fmt" + "os/exec" + "strings" + + "vitess.io/vitess/go/vt/log" +) + +// VtctlProcess is a generic handle for a running vtctl command . +// It can be spawned manually +type VtctlProcess struct { + Name string + Binary string + TopoImplementation string + TopoGlobalAddress string + TopoGlobalRoot string + TopoServerAddress string +} + +// AddCellInfo executes vtctl command to add cell info +func (vtctl *VtctlProcess) AddCellInfo(Cell string) (err error) { + tmpProcess := exec.Command( + vtctl.Binary, + "-topo_implementation", vtctl.TopoImplementation, + "-topo_global_server_address", vtctl.TopoGlobalAddress, + "-topo_global_root", vtctl.TopoGlobalRoot, + "AddCellInfo", + "-root", "/vitess/"+Cell, + "-server_address", vtctl.TopoServerAddress, + Cell, + ) + return tmpProcess.Run() +} + +// CreateKeyspace executes vtctl command to create keyspace +func (vtctl *VtctlProcess) CreateKeyspace(keyspace string) (err error) { + tmpProcess := exec.Command( + vtctl.Binary, + "-topo_implementation", vtctl.TopoImplementation, + "-topo_global_server_address", vtctl.TopoGlobalAddress, + "-topo_global_root", vtctl.TopoGlobalRoot, + "CreateKeyspace", keyspace, + ) + log.Info(fmt.Sprintf("Starting CreateKeyspace with arguments %v", strings.Join(tmpProcess.Args, " "))) + return tmpProcess.Run() +} + +// VtctlProcessInstance returns a VtctlProcess handle for vtctl process +// configured with the given Config. +// The process must be manually started by calling setup() +func VtctlProcessInstance(topoPort int, hostname string) *VtctlProcess { + vtctl := &VtctlProcess{ + Name: "vtctl", + Binary: "vtctl", + TopoImplementation: "etcd2", + TopoGlobalAddress: fmt.Sprintf("%s:%d", hostname, topoPort), + TopoGlobalRoot: "/vitess/global", + TopoServerAddress: fmt.Sprintf("%s:%d", hostname, topoPort), + } + return vtctl +} diff --git a/go/test/endtoend/cluster/vtctlclient_process.go b/go/test/endtoend/cluster/vtctlclient_process.go new file mode 100644 index 00000000000..982f0fd6a70 --- /dev/null +++ b/go/test/endtoend/cluster/vtctlclient_process.go @@ -0,0 +1,98 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "fmt" + "os/exec" + "strings" + + "vitess.io/vitess/go/vt/log" +) + +// VtctlClientProcess is a generic handle for a running vtctlclient command . +// It can be spawned manually +type VtctlClientProcess struct { + Name string + Binary string + Server string + TempDirectory string + ZoneName string +} + +// InitShardMaster executes vtctlclient command to make one of tablet as master +func (vtctlclient *VtctlClientProcess) InitShardMaster(Keyspace string, Shard string, Cell string, TabletUID int) (err error) { + return vtctlclient.ExecuteCommand( + "InitShardMaster", + "-force", + fmt.Sprintf("%s/%s", Keyspace, Shard), + fmt.Sprintf("%s-%d", Cell, TabletUID)) +} + +// ApplySchema applies SQL schema to the keyspace +func (vtctlclient *VtctlClientProcess) ApplySchema(Keyspace string, SQL string) (err error) { + return vtctlclient.ExecuteCommand( + "ApplySchema", + "-sql", SQL, + Keyspace) +} + +// ApplyVSchema applies vitess schema (JSON format) to the keyspace +func (vtctlclient *VtctlClientProcess) ApplyVSchema(Keyspace string, JSON string) (err error) { + return vtctlclient.ExecuteCommand( + "ApplyVSchema", + "-vschema", JSON, + Keyspace, + ) +} + +// ExecuteCommand executes any vtctlclient command +func (vtctlclient *VtctlClientProcess) ExecuteCommand(args ...string) (err error) { + args = append([]string{"-server", vtctlclient.Server}, args...) + tmpProcess := exec.Command( + vtctlclient.Binary, + args..., + ) + println(fmt.Sprintf("Executing vtctlclient with arguments %v", strings.Join(tmpProcess.Args, " "))) + log.Info(fmt.Sprintf("Executing vtctlclient with arguments %v", strings.Join(tmpProcess.Args, " "))) + return tmpProcess.Run() +} + +// ExecuteCommandWithOutput executes any vtctlclient command and returns output +func (vtctlclient *VtctlClientProcess) ExecuteCommandWithOutput(args ...string) (result string, err error) { + args = append([]string{"-server", vtctlclient.Server}, args...) + tmpProcess := exec.Command( + vtctlclient.Binary, + args..., + ) + println(fmt.Sprintf("Executing vtctlclient with arguments %v", strings.Join(tmpProcess.Args, " "))) + log.Info(fmt.Sprintf("Executing vtctlclient with arguments %v", strings.Join(tmpProcess.Args, " "))) + resultByte, err := tmpProcess.CombinedOutput() + return string(resultByte), err +} + +// VtctlClientProcessInstance returns a VtctlProcess handle for vtctlclient process +// configured with the given Config. +func VtctlClientProcessInstance(hostname string, grpcPort int, tmpDirectory string) *VtctlClientProcess { + vtctlclient := &VtctlClientProcess{ + Name: "vtctlclient", + Binary: "vtctlclient", + Server: fmt.Sprintf("%s:%d", hostname, grpcPort), + TempDirectory: tmpDirectory, + } + return vtctlclient +} diff --git a/go/test/endtoend/cluster/vtctld_process.go b/go/test/endtoend/cluster/vtctld_process.go new file mode 100644 index 00000000000..d8666b40912 --- /dev/null +++ b/go/test/endtoend/cluster/vtctld_process.go @@ -0,0 +1,173 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "fmt" + "net/http" + "os" + "os/exec" + "path" + "strings" + "syscall" + "time" + + "vitess.io/vitess/go/vt/log" +) + +// VtctldProcess is a generic handle for a running vtctld . +// It can be spawned manually +type VtctldProcess struct { + Name string + Binary string + CommonArg VtctlProcess + WebDir string + WebDir2 string + ServiceMap string + BackupStorageImplementation string + FileBackupStorageRoot string + LogDir string + Port int + GrpcPort int + PidFile string + VerifyURL string + Directory string + + proc *exec.Cmd + exit chan error +} + +// Setup starts vtctld process with required arguements +func (vtctld *VtctldProcess) Setup(Cell string) (err error) { + _ = createDirectory(vtctld.LogDir, 0700) + _ = createDirectory(path.Join(vtctld.Directory, "backups"), 0700) + vtctld.proc = exec.Command( + vtctld.Binary, + "-enable_queries", + "-topo_implementation", vtctld.CommonArg.TopoImplementation, + "-topo_global_server_address", vtctld.CommonArg.TopoGlobalAddress, + "-topo_global_root", vtctld.CommonArg.TopoGlobalRoot, + "-cell", Cell, + "-web_dir", vtctld.WebDir, + "-web_dir2", vtctld.WebDir2, + "-workflow_manager_init", + "-workflow_manager_use_election", + "-service_map", vtctld.ServiceMap, + "-backup_storage_implementation", vtctld.BackupStorageImplementation, + "-file_backup_storage_root", vtctld.FileBackupStorageRoot, + "-log_dir", vtctld.LogDir, + "-port", fmt.Sprintf("%d", vtctld.Port), + "-grpc_port", fmt.Sprintf("%d", vtctld.GrpcPort), + "-pid_file", vtctld.PidFile, + ) + + vtctld.proc.Stderr = os.Stderr + vtctld.proc.Stdout = os.Stdout + + vtctld.proc.Env = append(vtctld.proc.Env, os.Environ()...) + + log.Infof("%v %v", strings.Join(vtctld.proc.Args, " ")) + + err = vtctld.proc.Start() + if err != nil { + return + } + + vtctld.exit = make(chan error) + go func() { + vtctld.exit <- vtctld.proc.Wait() + }() + + timeout := time.Now().Add(60 * time.Second) + for time.Now().Before(timeout) { + if vtctld.IsHealthy() { + return nil + } + select { + case err := <-vtctld.exit: + return fmt.Errorf("process '%s' exited prematurely (err: %s)", vtctld.Name, err) + default: + time.Sleep(300 * time.Millisecond) + } + } + + return fmt.Errorf("process '%s' timed out after 60s (err: %s)", vtctld.Name, <-vtctld.exit) +} + +func createDirectory(dirName string, mode os.FileMode) error { + if _, err := os.Stat(dirName); os.IsNotExist(err) { + return os.Mkdir(dirName, mode) + } + return nil +} + +// IsHealthy function checks if vtctld process is up and running +func (vtctld *VtctldProcess) IsHealthy() bool { + resp, err := http.Get(vtctld.VerifyURL) + if err != nil { + return false + } + if resp.StatusCode == 200 { + return true + } + return false +} + +// TearDown shutdowns the running vtctld service +func (vtctld *VtctldProcess) TearDown() error { + if vtctld.proc == nil || vtctld.exit == nil { + return nil + } + + // Attempt graceful shutdown with SIGTERM first + vtctld.proc.Process.Signal(syscall.SIGTERM) + + select { + case err := <-vtctld.exit: + vtctld.proc = nil + return err + + case <-time.After(10 * time.Second): + vtctld.proc.Process.Kill() + vtctld.proc = nil + return <-vtctld.exit + } +} + +// VtctldProcessInstance returns a VtctlProcess handle for vtctl process +// configured with the given Config. +// The process must be manually started by calling setup() +func VtctldProcessInstance(httpPort int, grpcPort int, topoPort int, hostname string, tmpDirectory string) *VtctldProcess { + vtctl := VtctlProcessInstance(topoPort, hostname) + vtctld := &VtctldProcess{ + Name: "vtctld", + Binary: "vtctld", + CommonArg: *vtctl, + WebDir: path.Join(os.Getenv("VTROOT"), "/web/vtctld"), + WebDir2: path.Join(os.Getenv("VTROOT"), "/web/vtctld2/app"), + ServiceMap: "grpc-vtctl", + BackupStorageImplementation: "file", + FileBackupStorageRoot: path.Join(os.Getenv("VTDATAROOT"), "/backups"), + LogDir: tmpDirectory, + Port: httpPort, + GrpcPort: grpcPort, + PidFile: path.Join(tmpDirectory, "vtctld.pid"), + Directory: os.Getenv("VTDATAROOT"), + } + vtctld.VerifyURL = fmt.Sprintf("http://%s:%d/debug/vars", hostname, vtctld.Port) + return vtctld +} diff --git a/go/test/endtoend/cluster/vtgate_process.go b/go/test/endtoend/cluster/vtgate_process.go new file mode 100644 index 00000000000..e2b771735ad --- /dev/null +++ b/go/test/endtoend/cluster/vtgate_process.go @@ -0,0 +1,197 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os" + "os/exec" + "path" + "reflect" + "strings" + "syscall" + "time" + + "vitess.io/vitess/go/vt/log" +) + +// VtgateProcess is a generic handle for a running vtgate . +// It can be spawned manually +type VtgateProcess struct { + Name string + Binary string + CommonArg VtctlProcess + LogDir string + FileToLogQueries string + Port int + GrpcPort int + MySQLServerPort int + MySQLServerSocketPath string + Cell string + CellsToWatch string + TabletTypesToWait string + GatewayImplementation string + ServiceMap string + PidFile string + MySQLAuthServerImpl string + Directory string + VerifyURL string + //Extra Args to be set before starting the vtgate process + ExtraArgs []string + + proc *exec.Cmd + exit chan error +} + +// Setup starts Vtgate process with required arguements +func (vtgate *VtgateProcess) Setup() (err error) { + + vtgate.proc = exec.Command( + vtgate.Binary, + "-topo_implementation", vtgate.CommonArg.TopoImplementation, + "-topo_global_server_address", vtgate.CommonArg.TopoGlobalAddress, + "-topo_global_root", vtgate.CommonArg.TopoGlobalRoot, + "-log_dir", vtgate.LogDir, + "-log_queries_to_file", vtgate.FileToLogQueries, + "-port", fmt.Sprintf("%d", vtgate.Port), + "-grpc_port", fmt.Sprintf("%d", vtgate.GrpcPort), + "-mysql_server_port", fmt.Sprintf("%d", vtgate.MySQLServerPort), + "-mysql_server_socket_path", vtgate.MySQLServerSocketPath, + "-cell", vtgate.Cell, + "-cells_to_watch", vtgate.CellsToWatch, + "-tablet_types_to_wait", vtgate.TabletTypesToWait, + "-gateway_implementation", vtgate.GatewayImplementation, + "-service_map", vtgate.ServiceMap, + "-mysql_auth_server_impl", vtgate.MySQLAuthServerImpl, + "-pid_file", vtgate.PidFile, + ) + vtgate.proc.Args = append(vtgate.proc.Args, vtgate.ExtraArgs...) + + vtgate.proc.Stderr = os.Stderr + vtgate.proc.Stdout = os.Stdout + + vtgate.proc.Env = append(vtgate.proc.Env, os.Environ()...) + + log.Infof("%v %v", strings.Join(vtgate.proc.Args, " ")) + + err = vtgate.proc.Start() + if err != nil { + return + } + + vtgate.exit = make(chan error) + go func() { + vtgate.exit <- vtgate.proc.Wait() + }() + + timeout := time.Now().Add(60 * time.Second) + for time.Now().Before(timeout) { + if vtgate.WaitForStatus() { + return nil + } + select { + case err := <-vtgate.exit: + return fmt.Errorf("process '%s' exited prematurely (err: %s)", vtgate.Name, err) + default: + time.Sleep(300 * time.Millisecond) + } + } + + return fmt.Errorf("process '%s' timed out after 60s (err: %s)", vtgate.Name, <-vtgate.exit) +} + +// WaitForStatus function checks if vtgate process is up and running +func (vtgate *VtgateProcess) WaitForStatus() bool { + resp, err := http.Get(vtgate.VerifyURL) + if err != nil { + return false + } + if resp.StatusCode == 200 { + resultMap := make(map[string]interface{}) + respByte, _ := ioutil.ReadAll(resp.Body) + err := json.Unmarshal(respByte, &resultMap) + if err != nil { + panic(err) + } + object := reflect.ValueOf(resultMap["HealthcheckConnections"]) + masterConnectionExist := false + if object.Kind() == reflect.Map { + for _, key := range object.MapKeys() { + + if strings.Contains(key.String(),"master") { + masterConnectionExist = true + } + } + } + return masterConnectionExist + } + return false +} + +// TearDown shuts down the running vtgate service +func (vtgate *VtgateProcess) TearDown() error { + if vtgate.proc == nil || vtgate.exit == nil { + return nil + } + // Attempt graceful shutdown with SIGTERM first + vtgate.proc.Process.Signal(syscall.SIGTERM) + + select { + case err := <-vtgate.exit: + vtgate.proc = nil + return err + + case <-time.After(10 * time.Second): + vtgate.proc.Process.Kill() + vtgate.proc = nil + return <-vtgate.exit + } +} + +// VtgateProcessInstance returns a Vtgate handle for vtgate process +// configured with the given Config. +// The process must be manually started by calling setup() +func VtgateProcessInstance(port int, grpcPort int, mySQLServerPort int, cell string, cellsToWatch string, hostname string, tabletTypesToWait string, topoPort int, tmpDirectory string, extraArgs []string) *VtgateProcess { + vtctl := VtctlProcessInstance(topoPort, hostname) + vtgate := &VtgateProcess{ + Name: "vtgate", + Binary: "vtgate", + FileToLogQueries: path.Join(tmpDirectory, "/vtgate_querylog.txt"), + Directory: os.Getenv("VTDATAROOT"), + ServiceMap: "grpc-vtgateservice", + LogDir: tmpDirectory, + Port: port, + GrpcPort: grpcPort, + MySQLServerPort: mySQLServerPort, + MySQLServerSocketPath: path.Join(tmpDirectory, "mysql.sock"), + Cell: cell, + CellsToWatch: cellsToWatch, + TabletTypesToWait: tabletTypesToWait, + GatewayImplementation: "discoverygateway", + CommonArg: *vtctl, + PidFile: path.Join(tmpDirectory, "/vtgate.pid"), + MySQLAuthServerImpl: "none", + ExtraArgs: extraArgs, + } + + vtgate.VerifyURL = fmt.Sprintf("http://%s:%d/debug/vars", hostname, port) + + return vtgate +} diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go new file mode 100644 index 00000000000..f3ced054d00 --- /dev/null +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -0,0 +1,203 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os" + "os/exec" + "path" + "strings" + "syscall" + "time" + + "vitess.io/vitess/go/vt/log" +) + +// VttabletProcess is a generic handle for a running vttablet . +// It can be spawned manually +type VttabletProcess struct { + Name string + Binary string + FileToLogQueries string + TabletUID int + TabletPath string + Cell string + Port int + GrpcPort int + PidFile string + Shard string + CommonArg VtctlProcess + LogDir string + TabletHostname string + Keyspace string + TabletType string + HealthCheckInterval int + BackupStorageImplementation string + FileBackupStorageRoot string + ServiceMap string + VtctldAddress string + Directory string + VerifyURL string + //Extra Args to be set before starting the vttablet process + ExtraArgs []string + + proc *exec.Cmd + exit chan error +} + +// Setup starts vtctld process with required arguements +func (vttablet *VttabletProcess) Setup() (err error) { + + vttablet.proc = exec.Command( + vttablet.Binary, + "-topo_implementation", vttablet.CommonArg.TopoImplementation, + "-topo_global_server_address", vttablet.CommonArg.TopoGlobalAddress, + "-topo_global_root", vttablet.CommonArg.TopoGlobalRoot, + "-log_queries_to_file", vttablet.FileToLogQueries, + "-tablet-path", vttablet.TabletPath, + "-port", fmt.Sprintf("%d", vttablet.Port), + "-grpc_port", fmt.Sprintf("%d", vttablet.GrpcPort), + "-pid_file", vttablet.PidFile, + "-init_shard", vttablet.Shard, + "-log_dir", vttablet.LogDir, + "-tablet_hostname", vttablet.TabletHostname, + "-init_keyspace", vttablet.Keyspace, + "-init_tablet_type", vttablet.TabletType, + "-health_check_interval", fmt.Sprintf("%ds", vttablet.HealthCheckInterval), + "-enable_semi_sync", + "-enable_replication_reporter", + "-backup_storage_implementation", vttablet.BackupStorageImplementation, + "-file_backup_storage_root", vttablet.FileBackupStorageRoot, + "-restore_from_backup", + "-service_map", vttablet.ServiceMap, + "-vtctld_addr", vttablet.VtctldAddress, + ) + vttablet.proc.Args = append(vttablet.proc.Args, vttablet.ExtraArgs...) + + vttablet.proc.Stderr = os.Stderr + vttablet.proc.Stdout = os.Stdout + + vttablet.proc.Env = append(vttablet.proc.Env, os.Environ()...) + + log.Infof("%v %v", strings.Join(vttablet.proc.Args, " ")) + + err = vttablet.proc.Start() + if err != nil { + return + } + + vttablet.exit = make(chan error) + go func() { + vttablet.exit <- vttablet.proc.Wait() + }() + + timeout := time.Now().Add(60 * time.Second) + for time.Now().Before(timeout) { + if vttablet.WaitForStatus("NOT_SERVING") { + return nil + } + select { + case err := <-vttablet.exit: + return fmt.Errorf("process '%s' exited prematurely (err: %s)", vttablet.Name, err) + default: + time.Sleep(300 * time.Millisecond) + } + } + + return fmt.Errorf("process '%s' timed out after 60s (err: %s)", vttablet.Name, <-vttablet.exit) +} + +// WaitForStatus function checks if vttablet process is up and running +func (vttablet *VttabletProcess) WaitForStatus(status string) bool { + resp, err := http.Get(vttablet.VerifyURL) + if err != nil { + return false + } + if resp.StatusCode == 200 { + resultMap := make(map[string]interface{}) + respByte, _ := ioutil.ReadAll(resp.Body) + err := json.Unmarshal(respByte, &resultMap) + if err != nil { + panic(err) + } + return resultMap["TabletStateName"] == status + } + return false +} + +// TearDown shuts down the running vttablet service +func (vttablet *VttabletProcess) TearDown() error { + if vttablet.proc == nil { + fmt.Printf("No process found for vttablet %d", vttablet.TabletUID) + } + if vttablet.proc == nil || vttablet.exit == nil { + return nil + } + // Attempt graceful shutdown with SIGTERM first + vttablet.proc.Process.Signal(syscall.SIGTERM) + + select { + case <-vttablet.exit: + vttablet.proc = nil + return nil + + case <-time.After(10 * time.Second): + vttablet.proc.Process.Kill() + vttablet.proc = nil + return <-vttablet.exit + } +} + +// VttabletProcessInstance returns a VttabletProcess handle for vttablet process +// configured with the given Config. +// The process must be manually started by calling setup() +func VttabletProcessInstance(port int, grpcPort int, tabletUID int, cell string, shard string, keyspace string, vtctldPort int, tabletType string, topoPort int, hostname string, tmpDirectory string, extraArgs []string) *VttabletProcess { + vtctl := VtctlProcessInstance(topoPort, hostname) + vttablet := &VttabletProcess{ + Name: "vttablet", + Binary: "vttablet", + FileToLogQueries: path.Join(tmpDirectory, fmt.Sprintf("/vt_%010d/querylog.txt", tabletUID)), + Directory: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", tabletUID)), + TabletPath: fmt.Sprintf("%s-%010d", cell, tabletUID), + ServiceMap: "grpc-queryservice,grpc-tabletmanager,grpc-updatestream", + LogDir: tmpDirectory, + Shard: shard, + TabletHostname: hostname, + Keyspace: keyspace, + TabletType: "replica", + CommonArg: *vtctl, + HealthCheckInterval: 5, + BackupStorageImplementation: "file", + FileBackupStorageRoot: path.Join(os.Getenv("VTDATAROOT"), "/backups"), + Port: port, + GrpcPort: grpcPort, + PidFile: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d/vttablet.pid", tabletUID)), + VtctldAddress: fmt.Sprintf("http://%s:%d", hostname, vtctldPort), + ExtraArgs: extraArgs, + } + + if tabletType == "rdonly" { + vttablet.TabletType = tabletType + } + vttablet.VerifyURL = fmt.Sprintf("http://%s:%d/debug/vars", hostname, port) + + return vttablet +} diff --git a/go/test/endtoend/clustertest/add_keyspace_test.go b/go/test/endtoend/clustertest/add_keyspace_test.go new file mode 100644 index 00000000000..e305866b752 --- /dev/null +++ b/go/test/endtoend/clustertest/add_keyspace_test.go @@ -0,0 +1,85 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This adds sharded keyspace dynamically in this test only and test sql insert, select +*/ + +package clustertest + +import ( + "context" + "fmt" + "testing" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + testKeyspace = &cluster.Keyspace{ + Name: "kstest", + SchemaSQL: `create table vt_user ( +id bigint, +name varchar(64), +primary key (id) +) Engine=InnoDB`, + VSchema: `{ + "sharded": true, + "vindexes": { + "hash_index": { + "type": "hash" + } + }, + "tables": { + "vt_user": { + "column_vindexes": [ + { + "column": "id", + "name": "hash_index" + } + ] + } + } +}`, + } +) + +func TestAddKeyspace(t *testing.T) { + if err := clusterInstance.StartKeyspace(*testKeyspace, []string{"-80", "80-"}, 1, true); err != nil { + println(err.Error()) + t.Fatal(err) + } + // Restart vtgate process + _ = clusterInstance.VtgateProcess.TearDown() + _ = clusterInstance.VtgateProcess.Setup() + + ctx := context.Background() + vtParams := mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + conn, err := mysql.Connect(ctx, &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + exec(t, conn, "insert into vt_user(id, name) values(1,'name1')") + + qr := exec(t, conn, "select id, name from vt_user") + if got, want := fmt.Sprintf("%v", qr.Rows), `[[INT64(1) VARCHAR("name1")]]`; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } +} diff --git a/go/test/endtoend/clustertest/etcd_test.go b/go/test/endtoend/clustertest/etcd_test.go new file mode 100644 index 00000000000..cb0138b0d5e --- /dev/null +++ b/go/test/endtoend/clustertest/etcd_test.go @@ -0,0 +1,29 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clustertest + +import ( + "fmt" + "testing" +) + +func TestEtcdServer(t *testing.T) { + etcdURL := fmt.Sprintf("http://%s:%d/v2/keys", clusterInstance.Hostname, clusterInstance.TopoPort) + testURL(t, etcdURL, "generic etcd url") + testURL(t, etcdURL+"/vitess/global", "vitess global key") + testURL(t, etcdURL+"/vitess/zone1", "vitess zone1 key") +} diff --git a/go/test/endtoend/clustertest/main_test.go b/go/test/endtoend/clustertest/main_test.go new file mode 100644 index 00000000000..11c48433ac3 --- /dev/null +++ b/go/test/endtoend/clustertest/main_test.go @@ -0,0 +1,114 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clustertest + +import ( + "flag" + "net/http" + "os" + "testing" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + keyspaceName = "commerce" + cell = "zone1" + sqlSchema = `create table product( + sku varbinary(128), + description varbinary(128), + price bigint, + primary key(sku) + ) ENGINE=InnoDB; + create table customer( + id bigint not null auto_increment, + email varchar(128), + primary key(id) + ) ENGINE=InnoDB; + create table corder( + order_id bigint not null auto_increment, + customer_id bigint, + sku varbinary(128), + price bigint, + primary key(order_id) + ) ENGINE=InnoDB;` + + vSchema = `{ + "tables": { + "product": {}, + "customer": {}, + "corder": {} + } + }` +) + +func TestMain(m *testing.M) { + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, "localhost") + defer clusterInstance.Teardown() + + // Start topo server + err := clusterInstance.StartTopo() + if err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: sqlSchema, + VSchema: vSchema, + } + err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, true) + if err != nil { + return 1 + } + + // Start vtgate + err = clusterInstance.StartVtgate() + if err != nil { + return 1 + } + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + return m.Run() + }() + os.Exit(exitCode) +} + +func testURL(t *testing.T, url string, testCaseName string) { + statusCode := getStatusForURL(url) + if got, want := statusCode, 200; got != want { + t.Errorf("select:\n%v want\n%v for %s", got, want, testCaseName) + } +} + +// getStatusForUrl returns the status code for the URL +func getStatusForURL(url string) int { + resp, _ := http.Get(url) + if resp != nil { + return resp.StatusCode + } + return 0 +} diff --git a/go/test/endtoend/clustertest/vtcltd_test.go b/go/test/endtoend/clustertest/vtcltd_test.go new file mode 100644 index 00000000000..4704fd7f99a --- /dev/null +++ b/go/test/endtoend/clustertest/vtcltd_test.go @@ -0,0 +1,28 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + +package clustertest + +import ( + "fmt" + "testing" +) + +func TestVtctldProcess(t *testing.T) { + url := fmt.Sprintf("http://localhost:%d/api/keyspaces/", clusterInstance.VtctldHTTPPort) + testURL(t, url, "keyspace url") +} diff --git a/go/test/endtoend/clustertest/vtgate_test.go b/go/test/endtoend/clustertest/vtgate_test.go new file mode 100644 index 00000000000..67773139cc0 --- /dev/null +++ b/go/test/endtoend/clustertest/vtgate_test.go @@ -0,0 +1,52 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This tests select/insert using the unshared keyspace added in main_test +*/ +package clustertest + +import ( + "context" + "fmt" + "testing" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" +) + +func TestVtgateProcess(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + exec(t, conn, "insert into customer(id, email) values(1,'email1')") + + qr := exec(t, conn, "select id, email from customer") + if got, want := fmt.Sprintf("%v", qr.Rows), `[[INT64(1) VARCHAR("email1")]]`; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } +} + +func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { + t.Helper() + qr, err := conn.ExecuteFetch(query, 1000, true) + if err != nil { + t.Fatal(err) + } + return qr +} diff --git a/go/test/endtoend/clustertest/vttablet_test.go b/go/test/endtoend/clustertest/vttablet_test.go new file mode 100644 index 00000000000..30e7beeca15 --- /dev/null +++ b/go/test/endtoend/clustertest/vttablet_test.go @@ -0,0 +1,41 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + +package clustertest + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "testing" +) + +func TestVttabletProcess(t *testing.T) { + firstTabletPort := clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].HTTPPort + testURL(t, fmt.Sprintf("http://localhost:%d/debug/vars/", firstTabletPort), "tablet debug var url") + resp, _ := http.Get(fmt.Sprintf("http://localhost:%d/debug/vars", firstTabletPort)) + resultMap := make(map[string]interface{}) + respByte, _ := ioutil.ReadAll(resp.Body) + err := json.Unmarshal(respByte, &resultMap) + if err != nil { + panic(err) + } + if got, want := resultMap["TabletKeyspace"], "commerce"; got != want { + t.Errorf("select:\n%v want\n%v for %s", got, want, "Keyspace of tablet should match") + } +} diff --git a/go/test/endtoend/vtgate/aggr_test.go b/go/test/endtoend/vtgate/aggr_test.go new file mode 100644 index 00000000000..e23e4a8970e --- /dev/null +++ b/go/test/endtoend/vtgate/aggr_test.go @@ -0,0 +1,57 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgate + +import ( + "context" + "fmt" + "testing" + + "vitess.io/vitess/go/mysql" +) + +func TestAggregateTypes(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + exec(t, conn, "insert into aggr_test(id, val1, val2) values(1,'a',1), (2,'A',1), (3,'b',1), (4,'c',3), (5,'c',4)") + exec(t, conn, "insert into aggr_test(id, val1, val2) values(6,'d',null), (7,'e',null), (8,'E',1)") + + qr := exec(t, conn, "select val1, count(distinct val2), count(*) from aggr_test group by val1") + if got, want := fmt.Sprintf("%v", qr.Rows), `[[VARCHAR("a") INT64(1) INT64(2)] [VARCHAR("b") INT64(1) INT64(1)] [VARCHAR("c") INT64(2) INT64(2)] [VARCHAR("d") INT64(0) INT64(1)] [VARCHAR("e") INT64(1) INT64(2)]]`; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + qr = exec(t, conn, "select val1, sum(distinct val2), sum(val2) from aggr_test group by val1") + if got, want := fmt.Sprintf("%v", qr.Rows), `[[VARCHAR("a") DECIMAL(1) DECIMAL(2)] [VARCHAR("b") DECIMAL(1) DECIMAL(1)] [VARCHAR("c") DECIMAL(7) DECIMAL(7)] [VARCHAR("d") NULL NULL] [VARCHAR("e") DECIMAL(1) DECIMAL(1)]]`; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + qr = exec(t, conn, "select val1, count(distinct val2) k, count(*) from aggr_test group by val1 order by k desc, val1") + if got, want := fmt.Sprintf("%v", qr.Rows), `[[VARCHAR("c") INT64(2) INT64(2)] [VARCHAR("a") INT64(1) INT64(2)] [VARCHAR("b") INT64(1) INT64(1)] [VARCHAR("e") INT64(1) INT64(2)] [VARCHAR("d") INT64(0) INT64(1)]]`; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + qr = exec(t, conn, "select val1, count(distinct val2) k, count(*) from aggr_test group by val1 order by k desc, val1 limit 4") + if got, want := fmt.Sprintf("%v", qr.Rows), `[[VARCHAR("c") INT64(2) INT64(2)] [VARCHAR("a") INT64(1) INT64(2)] [VARCHAR("b") INT64(1) INT64(1)] [VARCHAR("e") INT64(1) INT64(2)]]`; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } +} diff --git a/go/test/endtoend/vtgate/lookup_test.go b/go/test/endtoend/vtgate/lookup_test.go new file mode 100644 index 00000000000..126b2593609 --- /dev/null +++ b/go/test/endtoend/vtgate/lookup_test.go @@ -0,0 +1,270 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgate + +import ( + "context" + "fmt" + "strings" + "testing" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" +) + +func TestConsistentLookup(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + // conn2 is for queries that target shards. + conn2, err := mysql.Connect(ctx, &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn2.Close() + + // Simple insert. + exec(t, conn, "begin") + exec(t, conn, "insert into t1(id1, id2) values(1, 4)") + exec(t, conn, "commit") + qr := exec(t, conn, "select * from t1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select * from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4) VARBINARY(\"\\x16k@\\xb4J\\xbaK\\xd6\")]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Inserting again should fail. + exec(t, conn, "begin") + _, err = conn.ExecuteFetch("insert into t1(id1, id2) values(1, 4)", 1000, false) + exec(t, conn, "rollback") + want := "duplicate entry" + if err == nil || !strings.Contains(err.Error(), want) { + t.Errorf("second insert: %v, must contain %s", err, want) + } + + // Simple delete. + exec(t, conn, "begin") + exec(t, conn, "delete from t1 where id1=1") + exec(t, conn, "commit") + qr = exec(t, conn, "select * from t1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select * from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Autocommit insert. + exec(t, conn, "insert into t1(id1, id2) values(1, 4)") + qr = exec(t, conn, "select * from t1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select id2 from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + // Autocommit delete. + exec(t, conn, "delete from t1 where id1=1") + + // Dangling row pointing to existing keyspace id. + exec(t, conn, "insert into t1(id1, id2) values(1, 4)") + // Delete the main row only. + exec(t, conn2, "use `ks:-80`") + exec(t, conn2, "delete from t1 where id1=1") + // Verify the lookup row is still there. + qr = exec(t, conn, "select id2 from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + // Insert should still succeed. + exec(t, conn, "begin") + exec(t, conn, "insert into t1(id1, id2) values(1, 4)") + exec(t, conn, "commit") + qr = exec(t, conn, "select * from t1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + // Lookup row should be unchanged. + qr = exec(t, conn, "select * from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4) VARBINARY(\"\\x16k@\\xb4J\\xbaK\\xd6\")]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Dangling row not pointing to existing keyspace id. + exec(t, conn2, "use `ks:-80`") + exec(t, conn2, "delete from t1 where id1=1") + // Update the lookup row with bogus keyspace id. + exec(t, conn, "update t1_id2_idx set keyspace_id='aaa' where id2=4") + qr = exec(t, conn, "select * from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4) VARBINARY(\"aaa\")]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + // Insert should still succeed. + exec(t, conn, "begin") + exec(t, conn, "insert into t1(id1, id2) values(1, 4)") + exec(t, conn, "commit") + qr = exec(t, conn, "select * from t1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + // lookup row must be updated. + qr = exec(t, conn, "select * from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4) VARBINARY(\"\\x16k@\\xb4J\\xbaK\\xd6\")]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Update, but don't change anything. This should not deadlock. + exec(t, conn, "begin") + exec(t, conn, "update t1 set id2=4 where id1=1") + exec(t, conn, "commit") + qr = exec(t, conn, "select * from t1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select * from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(4) VARBINARY(\"\\x16k@\\xb4J\\xbaK\\xd6\")]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Update, and change the lookup value. This should change main and lookup rows. + exec(t, conn, "begin") + exec(t, conn, "update t1 set id2=5 where id1=1") + exec(t, conn, "commit") + qr = exec(t, conn, "select * from t1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(5)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select * from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(5) VARBINARY(\"\\x16k@\\xb4J\\xbaK\\xd6\")]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + exec(t, conn, "delete from t1 where id1=1") +} + +func TestConsistentLookupMultiInsert(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + // conn2 is for queries that target shards. + conn2, err := mysql.Connect(ctx, &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn2.Close() + + exec(t, conn, "begin") + exec(t, conn, "insert into t1(id1, id2) values(1,4), (2,5)") + exec(t, conn, "commit") + qr := exec(t, conn, "select * from t1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(4)] [INT64(2) INT64(5)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select count(*) from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(2)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Delete one row but leave its lookup dangling. + exec(t, conn2, "use `ks:-80`") + exec(t, conn2, "delete from t1 where id1=1") + // Insert a bogus lookup row. + exec(t, conn, "insert into t1_id2_idx(id2, keyspace_id) values(6, 'aaa')") + // Insert 3 rows: + // first row will insert without changing lookup. + // second will insert and change lookup. + // third will be a fresh insert for main and lookup. + exec(t, conn, "begin") + exec(t, conn, "insert into t1(id1, id2) values(1,2), (3,6), (4,7)") + exec(t, conn, "commit") + qr = exec(t, conn, "select id1, id2 from t1 order by id1") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(1) INT64(2)] [INT64(2) INT64(5)] [INT64(3) INT64(6)] [INT64(4) INT64(7)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select * from t1_id2_idx where id2=6") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(6) VARBINARY(\"N\\xb1\\x90ɢ\\xfa\\x16\\x9c\")]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select count(*) from t1_id2_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(5)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + exec(t, conn, "delete from t1 where id1=1") + exec(t, conn, "delete from t1 where id1=2") + exec(t, conn, "delete from t1 where id1=3") + exec(t, conn, "delete from t1 where id1=4") + exec(t, conn, "delete from t1_id2_idx where id2=4") +} + +func TestHashLookupMultiInsertIgnore(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + // conn2 is for queries that target shards. + conn2, err := mysql.Connect(ctx, &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn2.Close() + + // DB should start out clean + qr := exec(t, conn, "select count(*) from t2_id4_idx") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(0)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select count(*) from t2") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(0)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Try inserting a bunch of ids at once + exec(t, conn, "begin") + exec(t, conn, "insert ignore into t2(id3, id4) values(50,60), (30,40), (10,20)") + exec(t, conn, "commit") + + // Verify + qr = exec(t, conn, "select id3, id4 from t2 order by id3") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(10) INT64(20)] [INT64(30) INT64(40)] [INT64(50) INT64(60)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + qr = exec(t, conn, "select id3, id4 from t2_id4_idx order by id3") + if got, want := fmt.Sprintf("%v", qr.Rows), "[[INT64(10) INT64(20)] [INT64(30) INT64(40)] [INT64(50) INT64(60)]]"; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } +} + +func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { + t.Helper() + qr, err := conn.ExecuteFetch(query, 1000, true) + if err != nil { + t.Fatal(err) + } + return qr +} diff --git a/go/test/endtoend/vtgate/main_test.go b/go/test/endtoend/vtgate/main_test.go new file mode 100644 index 00000000000..d707b0d511d --- /dev/null +++ b/go/test/endtoend/vtgate/main_test.go @@ -0,0 +1,203 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgate + +import ( + "flag" + "os" + "testing" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + KeyspaceName = "ks" + Cell = "test" + SchemaSQL = `create table t1( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + +create table t1_id2_idx( + id2 bigint, + keyspace_id varbinary(10), + primary key(id2) +) Engine=InnoDB; + +create table vstream_test( + id bigint, + val bigint, + primary key(id) +) Engine=InnoDB; + +create table aggr_test( + id bigint, + val1 varchar(16), + val2 bigint, + primary key(id) +) Engine=InnoDB; + +create table t2( + id3 bigint, + id4 bigint, + primary key(id3) +) Engine=InnoDB; + +create table t2_id4_idx( + id bigint not null auto_increment, + id4 bigint, + id3 bigint, + primary key(id), + key idx_id4(id4) +) Engine=InnoDB; +` + + VSchema = ` + { + "sharded": true, + "vindexes": { + "hash": { + "type": "hash" + }, + "t1_id2_vdx": { + "type": "consistent_lookup_unique", + "params": { + "table": "t1_id2_idx", + "from": "id2", + "to": "keyspace_id" + }, + "owner": "t1" + }, + "t2_id4_idx": { + "type": "lookup_hash", + "params": { + "table": "t2_id4_idx", + "from": "id4", + "to": "id3", + "autocommit": "true" + }, + "owner": "t2" + } + }, + "tables": { + "t1": { + "column_vindexes": [ + { + "column": "id1", + "name": "hash" + }, + { + "column": "id2", + "name": "t1_id2_vdx" + } + ] + }, + "t1_id2_idx": { + "column_vindexes": [ + { + "column": "id2", + "name": "hash" + } + ] + }, + "t2": { + "column_vindexes": [ + { + "column": "id3", + "name": "hash" + }, + { + "column": "id4", + "name": "t2_id4_idx" + } + ] + }, + "t2_id4_idx": { + "column_vindexes": [ + { + "column": "id4", + "name": "hash" + } + ] + }, + "vstream_test": { + "column_vindexes": [ + { + "column": "id", + "name": "hash" + } + ] + }, + "aggr_test": { + "column_vindexes": [ + { + "column": "id", + "name": "hash" + } + ], + "columns": [ + { + "name": "val1", + "type": "VARCHAR" + } + ] + } + } +}` +) + +func TestMain(m *testing.M) { + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(Cell, "localhost") + defer clusterInstance.Teardown() + + // Start topo server + err := clusterInstance.StartTopo() + if err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: KeyspaceName, + SchemaSQL: SchemaSQL, + VSchema: VSchema, + } + err = clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, true) + if err != nil { + return 1 + } + + // Start vtgate + err = clusterInstance.StartVtgate() + if err != nil { + return 1 + } + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + return m.Run() + }() + os.Exit(exitCode) +} diff --git a/go/test/endtoend/vtgate/sequence/seq_test.go b/go/test/endtoend/vtgate/sequence/seq_test.go new file mode 100644 index 00000000000..72ce77bea02 --- /dev/null +++ b/go/test/endtoend/vtgate/sequence/seq_test.go @@ -0,0 +1,174 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sequence + +import ( + "context" + "flag" + "fmt" + "os" + "strings" + "testing" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + keyspaceName = "ks" + cell = "zone1" + hostname = "localhost" + sqlSchema = ` + create table sequence_test( + id bigint, + val varchar(16), + primary key(id) + )Engine=InnoDB; + + create table sequence_test_seq ( + id int default 0, + next_id bigint default null, + cache bigint default null, + primary key(id) + ) comment 'vitess_sequence' Engine=InnoDB; + ` + + vSchema = ` + { + "sharded":false, + "vindexes": { + "hash_index": { + "type": "hash" + } + }, + "tables": { + "sequence_test":{ + "auto_increment":{ + "column" : "id", + "sequence" : "sequence_test_seq" + }, + "column_vindexes": [ + { + "column": "id", + "name": "hash_index" + } + ] + }, + "sequence_test_seq": { + "type": "sequence" + } + } + } + ` +) + +func TestMain(m *testing.M) { + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + if err := clusterInstance.StartTopo(); err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: sqlSchema, + VSchema: vSchema, + } + if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil { + return 1 + } + + // Start vtgate + if err := clusterInstance.StartVtgate(); err != nil { + return 1 + } + + return m.Run() + }() + os.Exit(exitCode) +} + +func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { + t.Helper() + qr, err := conn.ExecuteFetch(query, 1000, true) + if err != nil { + t.Fatal(err) + } + return qr +} + +func TestSeq(t *testing.T) { + ctx := context.Background() + vtParams := mysql.ConnParams{ + Host: "localhost", + Port: clusterInstance.VtgateMySQLPort, + } + conn, err := mysql.Connect(ctx, &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + //Initialize seq table + exec(t, conn, "insert into sequence_test_seq(id, next_id, cache) values(0,1,10)") + + //Insert 4 values in the main table + exec(t, conn, "insert into sequence_test(val) values('a'), ('b') ,('c'), ('d')") + + // Test select calls to main table and verify expected id. + qr := exec(t, conn, "select id, val from sequence_test where id=4") + if got, want := fmt.Sprintf("%v", qr.Rows), `[[INT64(4) VARCHAR("d")]]`; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Test next available seq id from cache + qr = exec(t, conn, "select next 1 values from sequence_test_seq") + if got, want := fmt.Sprintf("%v", qr.Rows), `[[INT64(5)]]`; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + //Test next_id from seq table which should be the increased by cache value(id+cache) + qr = exec(t, conn, "select next_id from sequence_test_seq") + if got, want := fmt.Sprintf("%v", qr.Rows), `[[INT64(11)]]`; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + // Test insert with no auto-inc + exec(t, conn, "insert into sequence_test(id, val) values(6, 'f')") + qr = exec(t, conn, "select * from sequence_test") + if got, want := fmt.Sprintf("%v", qr.Rows), `[[INT64(1) VARCHAR("a")] [INT64(2) VARCHAR("b")] [INT64(3) VARCHAR("c")] [INT64(4) VARCHAR("d")] [INT64(6) VARCHAR("f")]]`; got != want { + t.Errorf("select:\n%v want\n%v", got, want) + } + + //Next insert will fail as we have corrupted the sequence + exec(t, conn, "begin") + _, err = conn.ExecuteFetch("insert into sequence_test(val) values('g')", 1000, false) + exec(t, conn, "rollback") + want := "Duplicate entry" + if err == nil || !strings.Contains(err.Error(), want) { + t.Errorf("wrong insert: %v, must contain %s", err, want) + } + +} diff --git a/go/test/endtoend/vtgate/transaction/trxn_mode_test.go b/go/test/endtoend/vtgate/transaction/trxn_mode_test.go new file mode 100644 index 00000000000..bf20e10dd9b --- /dev/null +++ b/go/test/endtoend/vtgate/transaction/trxn_mode_test.go @@ -0,0 +1,228 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package transaction + +import ( + "context" + "flag" + "fmt" + "os" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + keyspaceName = "ks" + cell = "zone1" + hostname = "localhost" + sqlSchema = ` + create table twopc_user ( + user_id bigint, + name varchar(128), + primary key (user_id) + ) Engine=InnoDB; + + create table twopc_lookup ( + name varchar(128), + id bigint, + primary key (id) + ) Engine=InnoDB;` + + vSchema = ` + { + "sharded":true, + "vindexes": { + "hash_index": { + "type": "hash" + }, + "twopc_lookup_vdx": { + "type": "lookup_hash_unique", + "params": { + "table": "twopc_lookup", + "from": "name", + "to": "id", + "autocommit": "true" + }, + "owner": "twopc_user" + } + }, + "tables": { + "twopc_user":{ + "column_vindexes": [ + { + "column": "user_id", + "name": "hash_index" + }, + { + "column": "name", + "name": "twopc_lookup_vdx" + } + ] + }, + "twopc_lookup": { + "column_vindexes": [ + { + "column": "id", + "name": "hash_index" + } + ] + } + } + } + ` +) + +func TestMain(m *testing.M) { + flag.Parse() + + exitcode, err := func() (int, error) { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Reserve vtGate port in order to pass it to vtTablet + clusterInstance.VtgateGrpcPort = clusterInstance.GetAndReservePort() + // Set extra tablet args for twopc + clusterInstance.VtTabletExtraArgs = []string{ + "-twopc_enable", + "-twopc_coordinator_address", fmt.Sprintf("localhost:%d", clusterInstance.VtgateGrpcPort), + "-twopc_abandon_age", "3600", + } + + // Start topo server + if err := clusterInstance.StartTopo(); err != nil { + return 1, err + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: sqlSchema, + VSchema: vSchema, + } + if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil { + return 1, err + } + + // Starting Vtgate in SINGLE transaction mode + clusterInstance.VtGateExtraArgs = []string{"-transaction_mode", "SINGLE"} + if err := clusterInstance.StartVtgate(); err != nil { + return 1, err + } + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + + return m.Run(), nil + }() + if err != nil { + fmt.Printf("%v\n", err) + os.Exit(1) + } else { + os.Exit(exitcode) + } +} + +func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { + t.Helper() + qr, err := conn.ExecuteFetch(query, 1000, true) + if err != nil { + t.Fatal(err) + } + return qr +} + +// TestTransactionModes tests trasactions using twopc mode +func TestTransactionModes(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + // Insert targeted to multiple tables should fail as Transaction mode is SINGLE + exec(t, conn, "begin") + exec(t, conn, "insert into twopc_user(user_id, name) values(1,'john')") + _, err = conn.ExecuteFetch("insert into twopc_user(user_id, name) values(6,'vick')", 1000, false) + exec(t, conn, "rollback") + want := "multi-db transaction attempted" + if err == nil || !strings.Contains(err.Error(), want) { + t.Errorf("multi-db insert: %v, must contain %s", err, want) + } + + // Enable TWOPC transaction mode + clusterInstance.VtGateExtraArgs = []string{"-transaction_mode", "TWOPC"} + // Restart VtGate + if err = clusterInstance.ReStartVtgate(); err != nil { + t.Errorf("Fail to re-start vtgate with new config: %v", err) + } + + // Make a new mysql connection to vtGate + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + conn2, err := mysql.Connect(ctx, &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn2.Close() + + // Insert targeted to multiple db should PASS with TWOPC trx mode + exec(t, conn2, "begin") + exec(t, conn2, "insert into twopc_user(user_id, name) values(3,'mark')") + exec(t, conn2, "insert into twopc_user(user_id, name) values(4,'doug')") + exec(t, conn2, "insert into twopc_lookup(name, id) values('Tim',7)") + exec(t, conn2, "commit") + + // Verify the values are present + qr := exec(t, conn2, "select user_id from twopc_user where name='mark'") + got := fmt.Sprintf("%v", qr.Rows) + want = `[[INT64(3)]]` + assert.Equal(t, want, got) + + qr = exec(t, conn2, "select name from twopc_lookup where id=3") + got = fmt.Sprintf("%v", qr.Rows) + want = `[[VARCHAR("mark")]]` + assert.Equal(t, want, got) + + // DELETE from multiple tables using TWOPC transaction mode + exec(t, conn2, "begin") + exec(t, conn2, "delete from twopc_user where user_id = 3") + exec(t, conn2, "delete from twopc_lookup where id = 3") + exec(t, conn2, "commit") + + // VERIFY that values are deleted + qr = exec(t, conn2, "select user_id from twopc_user where user_id=3") + got = fmt.Sprintf("%v", qr.Rows) + want = `[]` + assert.Equal(t, want, got) + + qr = exec(t, conn2, "select name from twopc_lookup where id=3") + got = fmt.Sprintf("%v", qr.Rows) + want = `[]` + assert.Equal(t, want, got) +} diff --git a/go/test/endtoend/vtgate/vschema/vschema_test.go b/go/test/endtoend/vtgate/vschema/vschema_test.go new file mode 100644 index 00000000000..bcab68351dc --- /dev/null +++ b/go/test/endtoend/vtgate/vschema/vschema_test.go @@ -0,0 +1,169 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vschema + +import ( + "context" + "flag" + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + hostname = "localhost" + keyspaceName = "ks" + cell = "zone1" + sqlSchema = ` + create table vt_user ( + id bigint, + name varchar(64), + primary key (id) + ) Engine=InnoDB; + + create table main ( + id bigint, + val varchar(128), + primary key(id) + ) Engine=InnoDB; +` +) + +func TestMain(m *testing.M) { + flag.Parse() + + exitcode, err := func() (int, error) { + clusterInstance = cluster.NewCluster(cell, hostname) + defer clusterInstance.Teardown() + + // Start topo server + if err := clusterInstance.StartTopo(); err != nil { + return 1, err + } + + // List of users authorized to execute vschema ddl operations + clusterInstance.VtGateExtraArgs = []string{"-vschema_ddl_authorized_users=%"} + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: sqlSchema, + } + if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil { + return 1, err + } + + // Start vtgate + if err := clusterInstance.StartVtgate(); err != nil { + return 1, err + } + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + return m.Run(), nil + }() + if err != nil { + fmt.Printf("%v\n", err) + os.Exit(1) + } else { + os.Exit(exitcode) + } + +} + +func TestVSchema(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + // Test the empty database with no vschema + exec(t, conn, "insert into vt_user (id,name) values(1,'test1'), (2,'test2'), (3,'test3'), (4,'test4')") + + qr := exec(t, conn, "select id, name from vt_user order by id") + got := fmt.Sprintf("%v", qr.Rows) + want := `[[INT64(1) VARCHAR("test1")] [INT64(2) VARCHAR("test2")] [INT64(3) VARCHAR("test3")] [INT64(4) VARCHAR("test4")]]` + assert.Equal(t, want, got) + + qr = exec(t, conn, "delete from vt_user") + got = fmt.Sprintf("%v", qr.Rows) + want = `[]` + assert.Equal(t, want, got) + + // Test empty vschema + qr = exec(t, conn, "SHOW VSCHEMA TABLES") + got = fmt.Sprintf("%v", qr.Rows) + want = `[[VARCHAR("dual")]]` + assert.Equal(t, want, got) + + // Use the DDL to create an unsharded vschema and test again + + // Create VSchema and do a Select to force update VSCHEMA + exec(t, conn, "begin") + exec(t, conn, "ALTER VSCHEMA ADD TABLE vt_user") + exec(t, conn, "select * from vt_user") + exec(t, conn, "commit") + + exec(t, conn, "begin") + exec(t, conn, "ALTER VSCHEMA ADD TABLE main") + exec(t, conn, "select * from main") + exec(t, conn, "commit") + + // Test Showing Tables + qr = exec(t, conn, "SHOW VSCHEMA TABLES") + got = fmt.Sprintf("%v", qr.Rows) + want = `[[VARCHAR("dual")] [VARCHAR("main")] [VARCHAR("vt_user")]]` + assert.Equal(t, want, got) + + // Test Showing Vindexes + qr = exec(t, conn, "SHOW VSCHEMA VINDEXES") + got = fmt.Sprintf("%v", qr.Rows) + want = `[]` + assert.Equal(t, want, got) + + // Test DML operations + exec(t, conn, "insert into vt_user (id,name) values(1,'test1'), (2,'test2'), (3,'test3'), (4,'test4')") + qr = exec(t, conn, "select id, name from vt_user order by id") + got = fmt.Sprintf("%v", qr.Rows) + want = `[[INT64(1) VARCHAR("test1")] [INT64(2) VARCHAR("test2")] [INT64(3) VARCHAR("test3")] [INT64(4) VARCHAR("test4")]]` + assert.Equal(t, want, got) + + qr = exec(t, conn, "delete from vt_user") + got = fmt.Sprintf("%v", qr.Rows) + want = `[]` + assert.Equal(t, want, got) + +} + +func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { + t.Helper() + qr, err := conn.ExecuteFetch(query, 1000, true) + if err != nil { + t.Fatal(err) + } + return qr +} diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index fea1e002a45..d7614759a2c 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -509,7 +509,7 @@ func AlterVReplicationTable() []string { // SetVReplicationState updates the state in the _vt.vreplication table. func SetVReplicationState(dbClient DBClient, uid uint32, state, message string) error { - query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(message), uid) + query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(MessageTruncate(message)), uid) if _, err := dbClient.ExecuteFetch(query, 1); err != nil { return fmt.Errorf("could not set state: %v: %v", query, err) } @@ -614,7 +614,7 @@ func StartVReplicationUntil(uid uint32, pos string) string { func StopVReplication(uid uint32, message string) string { return fmt.Sprintf( "update _vt.vreplication set state='%v', message=%v where id=%v", - BlpStopped, encodeString(message), uid) + BlpStopped, encodeString(MessageTruncate(message)), uid) } // DeleteVReplication returns a statement to delete the replication. @@ -622,6 +622,15 @@ func DeleteVReplication(uid uint32) string { return fmt.Sprintf("delete from _vt.vreplication where id=%v", uid) } +// MessageTruncate truncates the message string to a safe length. +func MessageTruncate(msg string) string { + // message length is 1000 bytes. + if len(msg) > 950 { + return msg[:950] + "..." + } + return msg +} + func encodeString(in string) string { buf := bytes.NewBuffer(nil) sqltypes.NewVarChar(in).EncodeSQL(buf) diff --git a/go/vt/mysqlctl/backupstorage/interface.go b/go/vt/mysqlctl/backupstorage/interface.go index 03e7b64949c..e4c2e6bc18d 100644 --- a/go/vt/mysqlctl/backupstorage/interface.go +++ b/go/vt/mysqlctl/backupstorage/interface.go @@ -30,6 +30,10 @@ var ( // BackupStorageImplementation is the implementation to use // for BackupStorage. Exported for test purposes. BackupStorageImplementation = flag.String("backup_storage_implementation", "", "which implementation to use for the backup storage feature") + // FileSizeUnknown is a special value indicating that the file size is not known. + // This is typically used while creating a file programmatically, where it is + // impossible to compute the final size on disk ahead of time. + FileSizeUnknown = int64(-1) ) // BackupHandle describes an individual backup. @@ -50,7 +54,9 @@ type BackupHandle interface { // The context is valid for the duration of the writes, until the // WriteCloser is closed. // filesize should not be treated as an exact value but rather - // as an approximate value + // as an approximate value. + // A filesize of -1 should be treated as a special value indicating that + // the file size is unknown. AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) // EndBackup stops and closes a backup. The contents should be kept. diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 5f8af216dc0..f49f8fdf780 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -303,7 +303,7 @@ func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, params BackupPar } // open the MANIFEST - wc, err := bh.AddFile(ctx, backupManifestFileName, 0) + wc, err := bh.AddFile(ctx, backupManifestFileName, backupstorage.FileSizeUnknown) if err != nil { return vterrors.Wrapf(err, "cannot add %v to backup", backupManifestFileName) } diff --git a/go/vt/mysqlctl/capabilityset.go b/go/vt/mysqlctl/capabilityset.go index 909eec70ac3..1b0855e3c1c 100644 --- a/go/vt/mysqlctl/capabilityset.go +++ b/go/vt/mysqlctl/capabilityset.go @@ -46,6 +46,9 @@ func (c *capabilitySet) hasMySQLUpgradeInServer() bool { func (c *capabilitySet) hasInitializeInServer() bool { return c.isMySQLLike() && c.version.atLeast(serverVersion{Major: 5, Minor: 7, Patch: 0}) } +func (c *capabilitySet) hasMaria104InstallDb() bool { + return c.isMariaDB() && c.version.atLeast(serverVersion{Major: 10, Minor: 4, Patch: 0}) +} // IsMySQLLike tests if the server is either MySQL // or Percona Server. At least currently, Vitess doesn't @@ -53,3 +56,6 @@ func (c *capabilitySet) hasInitializeInServer() bool { func (c *capabilitySet) isMySQLLike() bool { return c.flavor == flavorMySQL || c.flavor == flavorPercona } +func (c *capabilitySet) isMariaDB() bool { + return c.flavor == flavorMariaDB +} diff --git a/go/vt/mysqlctl/cephbackupstorage/ceph.go b/go/vt/mysqlctl/cephbackupstorage/ceph.go index 29b76e1b15e..80c37ceb828 100644 --- a/go/vt/mysqlctl/cephbackupstorage/ceph.go +++ b/go/vt/mysqlctl/cephbackupstorage/ceph.go @@ -88,7 +88,8 @@ func (bh *CephBackupHandle) AddFile(ctx context.Context, filename string, filesi // Give PutObject() the read end of the pipe. object := objName(bh.dir, bh.name, filename) - _, err := bh.client.PutObjectWithContext(ctx, bucket, object, reader, -1, minio.PutObjectOptions{ContentType: "application/octet-stream"}) + // If filesize is unknown, the caller should pass in -1 and we will pass it through. + _, err := bh.client.PutObjectWithContext(ctx, bucket, object, reader, filesize, minio.PutObjectOptions{ContentType: "application/octet-stream"}) if err != nil { // Signal the writer that an error occurred, in case it's not done writing yet. reader.CloseWithError(err) diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index 3402829441d..84c000306c2 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -742,6 +742,9 @@ func (mysqld *Mysqld) installDataDir(cnf *Mycnf) error { "--defaults-file=" + cnf.path, "--basedir=" + mysqlBaseDir, } + if mysqld.capabilities.hasMaria104InstallDb() { + args = append(args, "--auth-root-authentication-method=normal") + } cmdPath, err := binaryPath(mysqlRoot, "mysql_install_db") if err != nil { return err diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index fc9e27e2523..08d18638bce 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -158,7 +158,7 @@ func (be *XtrabackupEngine) ExecuteBackup(ctx context.Context, params BackupPara // open the MANIFEST params.Logger.Infof("Writing backup MANIFEST") - mwc, err := bh.AddFile(ctx, backupManifestFileName, 0) + mwc, err := bh.AddFile(ctx, backupManifestFileName, backupstorage.FileSizeUnknown) if err != nil { return false, vterrors.Wrapf(err, "cannot add %v to backup", backupManifestFileName) } diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index a22d2b4f5f4..7eb63e19f96 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -18,7 +18,6 @@ package topo import ( "path" - "sync" "github.com/golang/protobuf/proto" "golang.org/x/net/context" @@ -26,7 +25,6 @@ import ( "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/event" - "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo/events" @@ -228,30 +226,16 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string) } result := make(map[string]*ShardInfo, len(shards)) - wg := sync.WaitGroup{} - mu := sync.Mutex{} - rec := concurrency.FirstErrorRecorder{} for _, shard := range shards { - wg.Add(1) - go func(shard string) { - defer wg.Done() - si, err := ts.GetShard(ctx, keyspace, shard) - if err != nil { - if IsErrType(err, NoNode) { - log.Warningf("GetShard(%v, %v) returned ErrNoNode, consider checking the topology.", keyspace, shard) - } else { - rec.RecordError(vterrors.Wrapf(err, "GetShard(%v, %v) failed", keyspace, shard)) - } - return + si, err := ts.GetShard(ctx, keyspace, shard) + if err != nil { + if IsErrType(err, NoNode) { + log.Warningf("GetShard(%v, %v) returned ErrNoNode, consider checking the topology.", keyspace, shard) + } else { + vterrors.Wrapf(err, "GetShard(%v, %v) failed", keyspace, shard) } - mu.Lock() - result[shard] = si - mu.Unlock() - }(shard) - } - wg.Wait() - if rec.HasErrors() { - return nil, rec.Error() + } + result[shard] = si } return result, nil } diff --git a/go/vt/topo/memorytopo/file.go b/go/vt/topo/memorytopo/file.go index f7dda922e48..ddeba947e97 100644 --- a/go/vt/topo/memorytopo/file.go +++ b/go/vt/topo/memorytopo/file.go @@ -168,7 +168,7 @@ func (c *Conn) Delete(ctx context.Context, filePath string, version topo.Version // Check if it's a directory. if n.isDirectory() { //lint:ignore ST1005 Delete is a function name - return fmt.Errorf("Delete(%v, %v) failed: it's a directory", c.cell, filePath) + return fmt.Errorf("delete(%v, %v) failed: it's a directory", c.cell, filePath) } // Check the version. diff --git a/go/vt/topo/stats_conn_test.go b/go/vt/topo/stats_conn_test.go index 5148be26f48..501268092ea 100644 --- a/go/vt/topo/stats_conn_test.go +++ b/go/vt/topo/stats_conn_test.go @@ -67,7 +67,7 @@ func (st *fakeConn) Get(ctx context.Context, filePath string) (bytes []byte, ver // Delete is part of the Conn interface func (st *fakeConn) Delete(ctx context.Context, filePath string, version Version) (err error) { if filePath == "error" { - return fmt.Errorf("Dummy error") + return fmt.Errorf("dummy error") } return err } @@ -75,7 +75,7 @@ func (st *fakeConn) Delete(ctx context.Context, filePath string, version Version // Lock is part of the Conn interface func (st *fakeConn) Lock(ctx context.Context, dirPath, contents string) (lock LockDescriptor, err error) { if dirPath == "error" { - return lock, fmt.Errorf("Dummy error") + return lock, fmt.Errorf("dummy error") } return lock, err @@ -89,7 +89,7 @@ func (st *fakeConn) Watch(ctx context.Context, filePath string) (current *WatchD // NewMasterParticipation is part of the Conn interface func (st *fakeConn) NewMasterParticipation(name, id string) (mp MasterParticipation, err error) { if name == "error" { - return mp, fmt.Errorf("Dummy error") + return mp, fmt.Errorf("dummy error") } return mp, err diff --git a/go/vt/vtctl/query.go b/go/vt/vtctl/query.go index b6f068fedc5..3a97e961aaf 100644 --- a/go/vt/vtctl/query.go +++ b/go/vt/vtctl/query.go @@ -196,7 +196,7 @@ func commandVtGateExecute(ctx context.Context, wr *wrangler.Wrangler, subFlags * qr, err := session.Execute(ctx, subFlags.Arg(0), bindVars) if err != nil { //lint:ignore ST1005 function name - return fmt.Errorf("Execute failed: %v", err) + return fmt.Errorf("execute failed: %v", err) } if *json { return printJSON(wr.Logger(), qr) @@ -445,7 +445,7 @@ func commandVtTabletExecute(ctx context.Context, wr *wrangler.Wrangler, subFlags }, subFlags.Arg(1), bindVars, int64(*transactionID), executeOptions) if err != nil { //lint:ignore ST1005 function name - return fmt.Errorf("Execute failed: %v", err) + return fmt.Errorf("execute failed: %v", err) } if *json { return printJSON(wr.Logger(), qr) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index d8cd3d18e75..0048d12fc30 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -314,7 +314,7 @@ var commands = []commandGroup{ " ", "Start the VerticalSplitClone process to perform vertical resharding. Example: SplitClone from_ks to_ks 'a,/b.*/'"}, {"VDiff", commandVDiff, - "-workflow= [-source_cell=] [-target_cell=] [-tablet_types=REPLICA] [-filtered_replication_wait_time=30s]", + "[-source_cell=] [-target_cell=] [-tablet_types=replica] [-filtered_replication_wait_time=30s] ", "Perform a diff of all tables in the workflow"}, {"MigrateServedTypes", commandMigrateServedTypes, "[-cells=c1,c2,...] [-reverse] [-skip-refresh-state] ", @@ -323,10 +323,10 @@ var commands = []commandGroup{ "[-cells=c1,c2,...] [-reverse] ", "Makes the serve the given type. This command also rebuilds the serving graph."}, {"MigrateReads", commandMigrateReads, - "[-cells=c1,c2,...] [-reverse] -workflow=workflow ", + "[-cells=c1,c2,...] [-reverse] -tablet_type={replica|rdonly} ", "Migrate read traffic for the specified workflow."}, {"MigrateWrites", commandMigrateWrites, - "[-filtered_replication_wait_time=30s] [-cancel] [-reverse_replication=false] -workflow=workflow ", + "[-filtered_replication_wait_time=30s] [-cancel] [-reverse_replication=false] ", "Migrate write traffic for the specified workflow."}, {"CancelResharding", commandCancelResharding, "", @@ -1811,7 +1811,6 @@ func commandVerticalSplitClone(ctx context.Context, wr *wrangler.Wrangler, subFl } func commandVDiff(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { - workflow := subFlags.String("workflow", "", "Specifies the workflow name") sourceCell := subFlags.String("source_cell", "", "The source cell to compare from") targetCell := subFlags.String("target_cell", "", "The target cell to compare with") tabletTypes := subFlags.String("tablet_types", "", "Tablet types for source and target") @@ -1819,16 +1818,28 @@ func commandVDiff(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla if err := subFlags.Parse(args); err != nil { return err } + if subFlags.NArg() != 1 { - return fmt.Errorf("the is required") + return fmt.Errorf(" is required") + } + keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0)) + if err != nil { + return err } - targetKeyspace := subFlags.Arg(0) - _, err := wr.VDiff(ctx, targetKeyspace, *workflow, *sourceCell, *targetCell, *tabletTypes, *filteredReplicationWaitTime, + _, err = wr.VDiff(ctx, keyspace, workflow, *sourceCell, *targetCell, *tabletTypes, *filteredReplicationWaitTime, *HealthCheckTopologyRefresh, *HealthcheckRetryDelay, *HealthCheckTimeout) return err } +func splitKeyspaceWorkflow(in string) (keyspace, workflow string, err error) { + splits := strings.Split(in, ".") + if len(splits) != 2 { + return "", "", fmt.Errorf("invalid format for : %s", in) + } + return splits[0], splits[1], nil +} + func commandMigrateServedTypes(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update") reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward.") @@ -1889,16 +1900,15 @@ func commandMigrateServedFrom(ctx context.Context, wr *wrangler.Wrangler, subFla func commandMigrateReads(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { reverse := subFlags.Bool("reverse", false, "Moves the served tablet type backward instead of forward.") cellsStr := subFlags.String("cells", "", "Specifies a comma-separated list of cells to update") - workflow := subFlags.String("workflow", "", "Specifies the workflow name") + tabletType := subFlags.String("tablet_type", "", "Tablet type (replica or rdonly)") if err := subFlags.Parse(args); err != nil { return err } - if subFlags.NArg() != 2 { - return fmt.Errorf("the and arguments are required for the MigrateReads command") - } - keyspace := subFlags.Arg(0) - servedType, err := parseTabletType(subFlags.Arg(2), []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY}) + if *tabletType == "" { + return fmt.Errorf("-tablet_type must be specified") + } + servedType, err := parseTabletType(*tabletType, []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_RDONLY}) if err != nil { return err } @@ -1910,29 +1920,34 @@ func commandMigrateReads(ctx context.Context, wr *wrangler.Wrangler, subFlags *f if *reverse { direction = wrangler.DirectionBackward } - if *workflow == "" { - return fmt.Errorf("a -workflow=workflow argument is required") + if subFlags.NArg() != 1 { + return fmt.Errorf(" is required") + } + keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0)) + if err != nil { + return err } - return wr.MigrateReads(ctx, keyspace, *workflow, servedType, cells, direction) + + return wr.MigrateReads(ctx, keyspace, workflow, servedType, cells, direction) } func commandMigrateWrites(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { filteredReplicationWaitTime := subFlags.Duration("filtered_replication_wait_time", 30*time.Second, "Specifies the maximum time to wait, in seconds, for filtered replication to catch up on master migrations. The migration will be aborted on timeout.") reverseReplication := subFlags.Bool("reverse_replication", true, "Also reverse the replication") cancelMigrate := subFlags.Bool("cancel", false, "Cancel the failed migration and serve from source") - workflow := subFlags.String("workflow", "", "Specifies the workflow name") if err := subFlags.Parse(args); err != nil { return err } + if subFlags.NArg() != 1 { - return fmt.Errorf("the argument is required for the MigrateWrites command") + return fmt.Errorf(" is required") } - - keyspace := subFlags.Arg(0) - if *workflow == "" { - return fmt.Errorf("a -workflow=workflow argument is required") + keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0)) + if err != nil { + return err } - journalID, err := wr.MigrateWrites(ctx, keyspace, *workflow, *filteredReplicationWaitTime, *cancelMigrate, *reverseReplication) + + journalID, err := wr.MigrateWrites(ctx, keyspace, workflow, *filteredReplicationWaitTime, *cancelMigrate, *reverseReplication) if err != nil { return err } diff --git a/go/vt/vtgate/vindexes/xxhash.go b/go/vt/vtgate/vindexes/xxhash.go new file mode 100644 index 00000000000..48677a736f5 --- /dev/null +++ b/go/vt/vtgate/vindexes/xxhash.go @@ -0,0 +1,88 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vindexes + +import ( + "bytes" + "encoding/binary" + + "github.com/cespare/xxhash/v2" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/key" +) + +var ( + _ Vindex = (*XXHash)(nil) +) + +// XXHash defines vindex that hashes any sql types to a KeyspaceId +// by using xxhash64. It's Unique and works on any platform giving identical result. +type XXHash struct { + name string +} + +// NewXXHash creates a new XXHash. +func NewXXHash(name string, m map[string]string) (Vindex, error) { + return &XXHash{name: name}, nil +} + +// String returns the name of the vindex. +func (vind *XXHash) String() string { + return vind.name +} + +// Cost returns the cost of this index as 1. +func (vind *XXHash) Cost() int { + return 1 +} + +// IsUnique returns true since the Vindex is unique. +func (vind *XXHash) IsUnique() bool { + return true +} + +// Map can map ids to key.Destination objects. +func (vind *XXHash) Map(cursor VCursor, ids []sqltypes.Value) ([]key.Destination, error) { + out := make([]key.Destination, len(ids)) + for i := range ids { + id := ids[i].ToBytes() + out[i] = key.DestinationKeyspaceID(vXXHash(id)) + } + return out, nil +} + +// Verify returns true if ids maps to ksids. +func (vind *XXHash) Verify(_ VCursor, ids []sqltypes.Value, ksids [][]byte) ([]bool, error) { + out := make([]bool, len(ids)) + for i := range ids { + id := ids[i].ToBytes() + out[i] = bytes.Equal(vXXHash(id), ksids[i]) + } + return out, nil +} + +func init() { + Register("xxhash", NewXXHash) +} + +func vXXHash(shardKey []byte) []byte { + var hashed [8]byte + hashKey := xxhash.Sum64(shardKey) + binary.LittleEndian.PutUint64(hashed[:], hashKey) + return hashed[:] +} diff --git a/go/vt/vtgate/vindexes/xxhash_test.go b/go/vt/vtgate/vindexes/xxhash_test.go new file mode 100644 index 00000000000..36c1719443c --- /dev/null +++ b/go/vt/vtgate/vindexes/xxhash_test.go @@ -0,0 +1,143 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vindexes + +import ( + "bytes" + "fmt" + "reflect" + "strings" + "testing" + + "github.com/cespare/xxhash/v2" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/key" +) + +var xxHash Vindex + +func init() { + hv, err := CreateVindex("xxhash", "xxhash_name", map[string]string{"Table": "t", "Column": "c"}) + if err != nil { + panic(err) + } + xxHash = hv +} + +func TestXXHashCost(t *testing.T) { + if xxHash.Cost() != 1 { + t.Errorf("Cost(): %d, want 1", xxHash.Cost()) + } +} + +func TestXXHashString(t *testing.T) { + if strings.Compare("xxhash_name", xxHash.String()) != 0 { + t.Errorf("String(): %s, want xxhash_name", xxHash.String()) + } +} + +func TestXXHashMap(t *testing.T) { + tcases := []struct { + in sqltypes.Value + out []byte + }{{ + in: sqltypes.NewVarChar("test1"), + out: []byte{0xd0, 0x1a, 0xb7, 0xe4, 0xd6, 0x97, 0x8f, 0xb}, + }, { + in: sqltypes.NewVarChar("test2"), + out: []byte{0x87, 0xeb, 0x11, 0x71, 0x4c, 0xa, 0xe, 0x89}, + }, { + in: sqltypes.NewInt64(1), + out: []byte{0xd4, 0x64, 0x5, 0x36, 0x76, 0x12, 0xb4, 0xb7}, + }, { + in: sqltypes.NULL, + out: []byte{0x99, 0xe9, 0xd8, 0x51, 0x37, 0xdb, 0x46, 0xef}, + }, { + in: sqltypes.NewInt64(-1), + out: []byte{0xd8, 0xe2, 0xa6, 0xa7, 0xc8, 0xc7, 0x62, 0x3d}, + }, { + in: sqltypes.NewUint64(18446744073709551615), + out: []byte{0x47, 0x7c, 0xfa, 0x8d, 0x6d, 0x8f, 0x1f, 0x8d}, + }, { + in: sqltypes.NewInt64(9223372036854775807), + out: []byte{0xb3, 0x7e, 0xb0, 0x1f, 0x7b, 0xff, 0xaf, 0xd8}, + }, { + in: sqltypes.NewUint64(9223372036854775807), + out: []byte{0xb3, 0x7e, 0xb0, 0x1f, 0x7b, 0xff, 0xaf, 0xd8}, + }, { + in: sqltypes.NewInt64(-9223372036854775808), + out: []byte{0x10, 0x2c, 0x27, 0xdd, 0xb2, 0x6a, 0x60, 0x9e}, + }} + + for _, tcase := range tcases { + got, err := xxHash.Map(nil, []sqltypes.Value{tcase.in}) + if err != nil { + t.Error(err) + } + out := []byte(got[0].(key.DestinationKeyspaceID)) + if !bytes.Equal(tcase.out, out) { + t.Errorf("Map(%#v): %#v, want %#v", tcase.in, out, tcase.out) + } + } +} + +func TestXXHashVerify(t *testing.T) { + ids := []sqltypes.Value{sqltypes.NewUint64(1), sqltypes.NewUint64(2)} + ksids := [][]byte{{0xd4, 0x64, 0x5, 0x36, 0x76, 0x12, 0xb4, 0xb7}, {0xd4, 0x64, 0x5, 0x36, 0x76, 0x12, 0xb4, 0xb7}} + got, err := xxHash.Verify(nil, ids, ksids) + if err != nil { + t.Fatal(err) + } + want := []bool{true, false} + if !reflect.DeepEqual(got, want) { + t.Errorf("xxHash.Verify: %v, want %v", got, want) + } +} + +func BenchmarkXXHash(b *testing.B) { + for _, benchSize := range []struct { + name string + n int + }{ + {"8B", 8}, + {"64B", 64}, + {"512B", 512}, + {"1KB", 1e3}, + {"4KB", 4e3}, + } { + input := make([]byte, benchSize.n) + for i := range input { + input[i] = byte(i) + } + + name := fmt.Sprintf("xxHash,direct,bytes,n=%s", benchSize.name) + b.Run(name, func(b *testing.B) { + benchmarkHashBytes(b, input) + }) + + } +} + +var sink uint64 + +func benchmarkHashBytes(b *testing.B, input []byte) { + b.SetBytes(int64(len(input))) + for i := 0; i < b.N; i++ { + sink = xxhash.Sum64(input) + } +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 2dc86792fc7..014db54d9d4 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -165,7 +165,7 @@ func (vr *vreplicator) setMessage(message string) error { Time: time.Now(), Message: message, }) - query := fmt.Sprintf("update _vt.vreplication set message=%v where id=%v", encodeString(message), vr.id) + query := fmt.Sprintf("update _vt.vreplication set message=%v where id=%v", encodeString(binlogplayer.MessageTruncate(message)), vr.id) if _, err := vr.dbClient.Execute(query); err != nil { return fmt.Errorf("could not set message: %v: %v", query, err) } diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index b8fd1a3d204..6e385dcf79f 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -962,7 +962,7 @@ func TestTabletServerBeginFail(t *testing.T) { defer cancel() tsv.Begin(ctx, &target, nil) _, err = tsv.Begin(ctx, &target, nil) - want := "transaction pool connection limit exceeded" + want := "transaction pool aborting request due to already expired context" if err == nil || err.Error() != want { t.Fatalf("Begin err: %v, want %v", err, want) } diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go index e7ecfcc513d..42de72c3f0d 100644 --- a/go/vt/vttablet/tabletserver/tx_pool.go +++ b/go/vt/vttablet/tabletserver/tx_pool.go @@ -245,6 +245,9 @@ func (axp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions) ( switch err { case connpool.ErrConnPoolClosed: return 0, "", err + case pools.ErrCtxTimeout: + axp.LogActive() + return 0, "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool aborting request due to already expired context") case pools.ErrTimeout: axp.LogActive() return 0, "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "transaction pool connection limit exceeded") diff --git a/go/vt/vttablet/tabletserver/tx_pool_test.go b/go/vt/vttablet/tabletserver/tx_pool_test.go index cdd11cac5a0..eccb7030064 100644 --- a/go/vt/vttablet/tabletserver/tx_pool_test.go +++ b/go/vt/vttablet/tabletserver/tx_pool_test.go @@ -470,6 +470,25 @@ func TestTxPoolBeginWithError(t *testing.T) { } } +func TestTxPoolCancelledContextError(t *testing.T) { + db := fakesqldb.New(t) + defer db.Close() + db.AddRejectedQuery("begin", errRejected) + txPool := newTxPool() + txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams()) + defer txPool.Close() + ctx, cancel := context.WithCancel(context.Background()) + cancel() + _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}) + want := "transaction pool aborting request due to already expired context" + if err == nil || !strings.Contains(err.Error(), want) { + t.Errorf("Unexpected error: %v, want %s", err, want) + } + if got, want := vterrors.Code(err), vtrpcpb.Code_RESOURCE_EXHAUSTED; got != want { + t.Errorf("wrong error code error: got = %v, want = %v", got, want) + } +} + func TestTxPoolRollbackFail(t *testing.T) { sql := "alter table test_table add test_column int" db := fakesqldb.New(t) diff --git a/go/vt/workflow/long_polling.go b/go/vt/workflow/long_polling.go index 85463a71a7d..007472a2b7b 100644 --- a/go/vt/workflow/long_polling.go +++ b/go/vt/workflow/long_polling.go @@ -249,7 +249,7 @@ func (m *Manager) HandleHTTPLongPolling(pattern string) { ctx := context.TODO() if err := m.NodeManager().Action(ctx, ap); err != nil { - return fmt.Errorf("Action failed: %v", err) + return fmt.Errorf("action failed: %v", err) } http.Error(w, "", http.StatusOK) return nil diff --git a/go/vt/workflow/node.go b/go/vt/workflow/node.go index 7043370aeef..7895a3ba321 100644 --- a/go/vt/workflow/node.go +++ b/go/vt/workflow/node.go @@ -441,7 +441,7 @@ func (m *NodeManager) Action(ctx context.Context, ap *ActionParameters) error { if n.Listener == nil { m.mu.Unlock() - return fmt.Errorf("Action %v is invoked on a node without listener (node path is %v)", ap.Name, ap.Path) + return fmt.Errorf("action %v is invoked on a node without listener (node path is %v)", ap.Name, ap.Path) } nodeListener := n.Listener m.mu.Unlock() diff --git a/java/hadoop/src/main/java/io/vitess/hadoop/README.md b/java/hadoop/src/main/java/io/vitess/hadoop/README.md index fe88bdc650e..9e4a083bd4c 100644 --- a/java/hadoop/src/main/java/io/vitess/hadoop/README.md +++ b/java/hadoop/src/main/java/io/vitess/hadoop/README.md @@ -14,7 +14,7 @@ primary key (id)) Engine=InnoDB; Let's say we want to write a MapReduce job that imports this table from Vitess to HDFS where each row is turned into a CSV record in HDFS. -We can use [VitessInputFormat](https://github.com/vitessio/vitess/blob/master/java/hadoop/src/main/java/io/vitess/hadoop/VitessInputFormat.java), an implementation of Hadoop's [InputFormat](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html), for that. With VitessInputFormat, rows from the source table are streamed to the mapper task. Each input record has a [NullWritable](https://hadoop.apache.org/docs/r2.2.0/api/org/apache/hadoop/io/NullWritable.html) key (no key, really), and [RowWritable](https://github.com/vitessio/vitess/blob/master/java/hadoop/src/main/java/io/vitess/hadoop/RowWritable.java) as value, which is a writable implementation for the entire row's contents. +We can use [VitessInputFormat](https://github.com/vitessio/vitess/blob/master/java/hadoop/src/main/java/io/vitess/hadoop/VitessInputFormat.java), an implementation of Hadoop's [InputFormat](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html), for that. With VitessInputFormat, rows from the source table are streamed to the mapper task. Each input record has a [NullWritable](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/NullWritable.html) key (no key, really), and [RowWritable](https://github.com/vitessio/vitess/blob/master/java/hadoop/src/main/java/io/vitess/hadoop/RowWritable.java) as value, which is a writable implementation for the entire row's contents. Here is an example implementation of our mapper, which transforms each row into a CSV Text. diff --git a/java/pom.xml b/java/pom.xml index 2e35714fa49..49c3ec314a3 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -236,7 +236,7 @@ org.apache.maven.plugins maven-compiler-plugin - 3.8.0 + 3.8.1 1.8 1.8 diff --git a/misc/git/hooks/shellcheck b/misc/git/hooks/shellcheck index 7833f2442fd..af9d8f8b37b 100755 --- a/misc/git/hooks/shellcheck +++ b/misc/git/hooks/shellcheck @@ -9,19 +9,22 @@ if [ -z "$shfiles" ] ; then exit 0 fi -# The -e SC1090,SC1091 suppressing warnings about trying to find -# files imported with "source foo.sh". We only want to lint -# the files modified as part of this current diff. -if errors=$(shellcheck -e SC1090,SC1091 "$shfiles" 2>&1); then - # No lint errors. Return early. - exit 0 -fi - if [ -z "$(command -v shellcheck)" ]; then echo "shellcheck not found, please run: brew or apt-get install shellcheck" exit 0 fi +errors= +for file in $shfiles +do + # The -e SC1090,SC1091 suppressing warnings about trying to find + # files imported with "source foo.sh". We only want to lint + # the files modified as part of this current diff. + errors+=$(shellcheck -e SC1090,SC1091 "$file" 2>&1) +done + +[ -z "$errors" ] && exit 0 + # git doesn't give us access to user input, so let's steal it. if exec < /dev/tty; then # interactive shell. Prompt the user. diff --git a/test/config.json b/test/config.json index c303b4a6398..603e772de11 100644 --- a/test/config.json +++ b/test/config.json @@ -422,6 +422,18 @@ "RetryMax": 0, "Tags": [] }, + "cluster_endtoend": { + "File": "", + "Args": [], + "Command": [ + "make", + "e2e_test_cluster" + ], + "Manual": false, + "Shard": 5, + "RetryMax": 0, + "Tags": [] + }, "e2e_race": { "File": "", "Args": [], diff --git a/tools/e2e_test_cluster.sh b/tools/e2e_test_cluster.sh new file mode 100755 index 00000000000..d9ca94f6557 --- /dev/null +++ b/tools/e2e_test_cluster.sh @@ -0,0 +1,35 @@ +#!/bin/bash + +# Copyright 2019 The Vitess Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# These test uses excutables and launch them as process +# After that all tests run, here we are testing those + +# All Go packages with test files. +# Output per line: * +packages_with_tests=$(go list -f '{{if len .TestGoFiles}}{{.ImportPath}} {{join .TestGoFiles " "}}{{end}}' ./go/.../endtoend/... | sort) + +cluster_tests=$(echo "$packages_with_tests" | grep -E "go/test/endtoend" | cut -d" " -f1) + +# Run cluster test sequentially +echo "running cluster tests $cluster_tests" +echo "$cluster_tests" | xargs go test -v -p=1 +if [ $? -ne 0 ]; then + echo "ERROR: Go cluster tests failed. See above for errors." + echo + echo "This should NOT happen. Did you introduce a flaky unit test?" + echo "If so, please rename it to the suffix _flaky_test.go." + exit 1 +fi diff --git a/tools/e2e_test_race.sh b/tools/e2e_test_race.sh index d66a427a5ff..f3a31ac3af8 100755 --- a/tools/e2e_test_race.sh +++ b/tools/e2e_test_race.sh @@ -34,6 +34,7 @@ export GO111MODULE=on # All endtoend Go packages with test files. # Output per line: * packages_with_tests=$(go list -f '{{if len .TestGoFiles}}{{.ImportPath}} {{join .TestGoFiles " "}}{{end}}' ./go/.../endtoend/... | sort) +packages_with_tests=$(echo "$packages_with_tests" | grep -vE "go/test/endtoend" | cut -d" " -f1) # endtoend tests should be in a directory called endtoend all_e2e_tests=$(echo "$packages_with_tests" | cut -d" " -f1) diff --git a/tools/e2e_test_runner.sh b/tools/e2e_test_runner.sh index 4f7822d18ef..e2b9da256ad 100755 --- a/tools/e2e_test_runner.sh +++ b/tools/e2e_test_runner.sh @@ -1,13 +1,13 @@ #!/bin/bash # Copyright 2019 The Vitess Authors. -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -38,11 +38,11 @@ fi packages_with_tests=$(go list -f '{{if len .TestGoFiles}}{{.ImportPath}} {{join .TestGoFiles " "}}{{end}}' ./go/.../endtoend/... | sort) # Flaky tests have the suffix "_flaky_test.go". -all_except_flaky_tests=$(echo "$packages_with_tests" | grep -vE ".+ .+_flaky_test\.go" | cut -d" " -f1) -flaky_tests=$(echo "$packages_with_tests" | grep -E ".+ .+_flaky_test\.go" | cut -d" " -f1) +all_except_flaky_and_cluster_tests=$(echo "$packages_with_tests" | grep -vE ".+ .+_flaky_test\.go" | grep -vE "go/test/endtoend" | cut -d" " -f1) +flaky_tests=$(echo "$packages_with_tests" | grep -E ".+ .+_flaky_test\.go" | grep -vE "go/test/endtoend" | cut -d" " -f1) # Run non-flaky tests. -echo "$all_except_flaky_tests" | xargs go test $VT_GO_PARALLEL +echo "$all_except_flaky_and_cluster_tests" | xargs go test $VT_GO_PARALLEL if [ $? -ne 0 ]; then echo "ERROR: Go unit tests failed. See above for errors." echo