Skip to content

Commit

Permalink
Merge pull request #7936 from planetscale/ds-backport-7873
Browse files Browse the repository at this point in the history
[10.0] Fix for keyspaces_to_watch regression
  • Loading branch information
askdba authored Apr 23, 2021
2 parents b8d830f + 2e289f8 commit 2ad770e
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 6 deletions.
136 changes: 136 additions & 0 deletions go/test/endtoend/vtgate/keyspace_watches/keyspace_watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

/*
Test the vtgate's ability to route while watching a subset of keyspaces.
*/

package keyspacewatches

import (
"database/sql"
"fmt"
"math/rand"
"os"
"testing"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
)

var (
vtParams mysql.ConnParams
keyspaceUnshardedName = "ks1"
cell = "zone1"
hostname = "localhost"
mysqlAuthServerStatic = "mysql_auth_server_static.json"
sqlSchema = `
create table keyspaces_to_watch_test(
id BIGINT NOT NULL,
msg VARCHAR(64) NOT NULL,
PRIMARY KEY (id)
) Engine=InnoDB;`
)

// createConfig creates a config file in TmpDir in vtdataroot and writes the given data.
func createConfig(clusterInstance *cluster.LocalProcessCluster, name, data string) error {
// creating new file
f, err := os.Create(clusterInstance.TmpDirectory + "/" + name)
if err != nil {
return err
}

if data == "" {
return nil
}

// write the given data
_, err = fmt.Fprint(f, data)
return err
}

func createCluster() (*cluster.LocalProcessCluster, int) {
clusterInstance := cluster.NewCluster(cell, hostname)

// Start topo server
if err := clusterInstance.StartTopo(); err != nil {
return nil, 1
}

// create auth server config
SQLConfig := `{
"testuser1": {
"Password": "testpassword1",
"UserData": "vtgate client 1"
}
}`
if err := createConfig(clusterInstance, mysqlAuthServerStatic, SQLConfig); err != nil {
return nil, 1
}

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceUnshardedName,
SchemaSQL: sqlSchema,
}
if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil {
return nil, 1
}

clusterInstance.VtGateExtraArgs = []string{
"-mysql_auth_server_static_file", clusterInstance.TmpDirectory + "/" + mysqlAuthServerStatic,
"-keyspaces_to_watch", "ks1",
}

// Start vtgate
if err := clusterInstance.StartVtgate(); err != nil {
return nil, 1
}
vtParams = mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
rand.Seed(time.Now().UnixNano())
return clusterInstance, 0
}

func TestRoutingWithKeyspacesToWatch(t *testing.T) {
defer cluster.PanicHandler(t)

clusterInstance, exitCode := createCluster()
defer clusterInstance.Teardown()

if exitCode != 0 {
os.Exit(exitCode)
}

dsn := fmt.Sprintf(
"testuser1:testpassword1@tcp(%s:%v)/",
clusterInstance.Hostname,
clusterInstance.VtgateMySQLPort,
)
db, err := sql.Open("mysql", dsn)
require.Nil(t, err)
defer db.Close()

// if this returns w/o failing the test we're good to go
_, err = db.Exec("select * from keyspaces_to_watch_test")
require.Nil(t, err)
}
5 changes: 5 additions & 0 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ func init() {
flag.Var(&KeyspacesToWatch, "keyspaces_to_watch", "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema")
}

// FilteringKeyspaces returns true if any keyspaces have been configured to be filtered.
func FilteringKeyspaces() bool {
return len(KeyspacesToWatch) > 0
}

// TabletRecorder is a sub interface of HealthCheck.
// It is separated out to enable unit testing.
type TabletRecorder interface {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/discoverygateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func NewDiscoveryGateway(ctx context.Context, hc discovery.LegacyHealthCheck, se
}
var recorder discovery.LegacyTabletRecorder = dg.hc
if len(discovery.TabletFilters) > 0 {
if len(discovery.KeyspacesToWatch) > 0 {
if discovery.FilteringKeyspaces() {
log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time")
}

Expand All @@ -135,7 +135,7 @@ func NewDiscoveryGateway(ctx context.Context, hc discovery.LegacyHealthCheck, se
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
recorder = fbs
} else if len(discovery.KeyspacesToWatch) > 0 {
} else if discovery.FilteringKeyspaces() {
recorder = discovery.NewLegacyFilterByKeyspace(recorder, discovery.KeyspacesToWatch)
}

Expand Down
11 changes: 9 additions & 2 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"context"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
Expand Down Expand Up @@ -163,7 +164,8 @@ func newVCursorImpl(
vschema *vindexes.VSchema,
resolver *srvtopo.Resolver,
serv srvtopo.Server,
warnShardedOnly bool) (*vcursorImpl, error) {
warnShardedOnly bool,
) (*vcursorImpl, error) {
keyspace, tabletType, destination, err := parseDestinationTarget(safeSession.TargetString, vschema)
if err != nil {
return nil, err
Expand All @@ -174,7 +176,9 @@ func newVCursorImpl(
return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "transaction is supported only for master tablet type, current type: %v", tabletType)
}
var ts *topo.Server
if serv != nil {
// We don't have access to the underlying TopoServer if this vtgate is
// filtering keyspaces because we don't have an accurate view of the topo.
if serv != nil && !discovery.FilteringKeyspaces() {
ts, err = serv.GetTopoServer()
if err != nil {
return nil, err
Expand Down Expand Up @@ -588,6 +592,9 @@ func (vc *vcursorImpl) TabletType() topodatapb.TabletType {

// SubmitOnlineDDL implements the VCursor interface
func (vc *vcursorImpl) SubmitOnlineDDL(onlineDDl *schema.OnlineDDL) error {
if vc.topoServer == nil {
return vterrors.New(vtrpcpb.Code_INTERNAL, "Unable to apply DDL because toposerver is unavailable, ensure this vtgate is not using filtered keyspaces")
}
conn, err := vc.topoServer.ConnForCell(vc.ctx, topo.GlobalCell)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func Init(ctx context.Context, serv srvtopo.Server, cell string, tabletTypesToWa

// If we want to filter keyspaces replace the srvtopo.Server with a
// filtering server
if len(discovery.KeyspacesToWatch) > 0 {
if discovery.FilteringKeyspaces() {
log.Infof("Keyspace filtering enabled, selecting %v", discovery.KeyspacesToWatch)
var err error
serv, err = srvtopo.NewKeyspaceFilteringServer(serv, discovery.KeyspacesToWatch)
Expand Down Expand Up @@ -505,7 +505,7 @@ func LegacyInit(ctx context.Context, hc discovery.LegacyHealthCheck, serv srvtop

// If we want to filter keyspaces replace the srvtopo.Server with a
// filtering server
if len(discovery.KeyspacesToWatch) > 0 {
if discovery.FilteringKeyspaces() {
log.Infof("Keyspace filtering enabled, selecting %v", discovery.KeyspacesToWatch)
var err error
serv, err = srvtopo.NewKeyspaceFilteringServer(serv, discovery.KeyspacesToWatch)
Expand Down

0 comments on commit 2ad770e

Please sign in to comment.