Skip to content

Commit

Permalink
Merge pull request #8988 from planetscale/Keyspaces2WatchVStream
Browse files Browse the repository at this point in the history
Support VStream with keyspaces_to_watch
  • Loading branch information
deepthi authored Oct 27, 2021
2 parents ad3cc4e + 00ec04d commit d080992
Show file tree
Hide file tree
Showing 16 changed files with 381 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# DO NOT MODIFY: THIS FILE IS GENERATED USING "make generate_ci_workflows"

name: Cluster (vstream_with_keyspaces_to_watch)
on: [push, pull_request]
concurrency:
group: format('{0}-{1}', ${{ github.ref }}, 'Cluster (vstream_with_keyspaces_to_watch)')
cancel-in-progress: true

jobs:
build:
name: Run endtoend tests on Cluster (vstream_with_keyspaces_to_watch)
runs-on: ubuntu-18.04

steps:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.17

- name: Tune the OS
run: |
echo '1024 65535' | sudo tee -a /proc/sys/net/ipv4/ip_local_port_range
# TEMPORARY WHILE GITHUB FIXES THIS https://github.com/actions/virtual-environments/issues/3185
- name: Add the current IP address, long hostname and short hostname record to /etc/hosts file
run: |
echo -e "$(ip addr show eth0 | grep "inet\b" | awk '{print $2}' | cut -d/ -f1)\t$(hostname -f) $(hostname -s)" | sudo tee -a /etc/hosts
# DON'T FORGET TO REMOVE CODE ABOVE WHEN ISSUE IS ADRESSED!

- name: Check out code
uses: actions/checkout@v2

- name: Get dependencies
run: |
sudo apt-get update
sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata
sudo service mysql stop
sudo service etcd stop
sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/
sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld
go mod download
wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get install -y gnupg2
sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get update
sudo apt-get install percona-xtrabackup-24
- name: Run cluster endtoend test
timeout-minutes: 30
run: |
source build.env
eatmydata -- go run test.go -docker=false -print-log -follow -shard vstream_with_keyspaces_to_watch
3 changes: 2 additions & 1 deletion go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
vtdataroot string
mainClusterConfig *ClusterConfig
externalClusterConfig *ClusterConfig
extraVTGateArgs = []string{"-tablet_refresh_interval", "10ms"}
)

// ClusterConfig defines the parameters like ports, tmpDir, tablet types which uniquely define a vitess cluster
Expand Down Expand Up @@ -383,7 +384,7 @@ func (vc *VitessCluster) StartVtgate(t testing.TB, cell *Cell, cellsToWatch stri
vc.ClusterConfig.tabletTypes,
vc.ClusterConfig.topoPort,
vc.ClusterConfig.tmpDir,
[]string{"-tablet_refresh_interval", "10ms"},
extraVTGateArgs,
vc.ClusterConfig.vtgatePlannerVersion)
require.NotNil(t, vtgate)
if err := vtgate.Setup(); err != nil {
Expand Down
47 changes: 33 additions & 14 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ import (
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)

// Validates that a reparent while VStream API is streaming doesn't miss any events
// We stream only from the primary and while streaming we reparent to a replica and then back to the original primary
func TestVStreamFailover(t *testing.T) {
// Validates that we have a working VStream API
// If Failover is enabled:
// - We ensure that this works through active reparents and doesn't miss any events
// - We stream only from the primary and while streaming we reparent to a replica and then back to the original primary
func testVStreamWithFailover(t *testing.T, failover bool) {
defaultCellName := "zone1"
cells := []string{"zone1"}
allCellNames = "zone1"
vc = NewVitessCluster(t, "TestVStreamFailover", cells, mainClusterConfig)
vc = NewVitessCluster(t, "TestVStreamWithFailover", cells, mainClusterConfig)

require.NotNil(t, vc)
defaultReplicas = 2
Expand Down Expand Up @@ -139,17 +141,21 @@ func TestVStreamFailover(t *testing.T) {
tickCount++
switch tickCount {
case 1:
insertMu.Lock()
output, err := vc.VtctlClient.ExecuteCommandWithOutput("PlannedReparentShard", "-keyspace_shard=product/0", "-new_primary=zone1-101")
insertMu.Unlock()
log.Infof("output of first PRS is %s", output)
require.NoError(t, err)
if failover {
insertMu.Lock()
output, err := vc.VtctlClient.ExecuteCommandWithOutput("PlannedReparentShard", "-keyspace_shard=product/0", "-new_primary=zone1-101")
insertMu.Unlock()
log.Infof("output of first PRS is %s", output)
require.NoError(t, err)
}
case 2:
insertMu.Lock()
output, err := vc.VtctlClient.ExecuteCommandWithOutput("PlannedReparentShard", "-keyspace_shard=product/0", "-new_primary=zone1-100")
insertMu.Unlock()
log.Infof("output of second PRS is %s", output)
require.NoError(t, err)
if failover {
insertMu.Lock()
output, err := vc.VtctlClient.ExecuteCommandWithOutput("PlannedReparentShard", "-keyspace_shard=product/0", "-new_primary=zone1-100")
insertMu.Unlock()
log.Infof("output of second PRS is %s", output)
require.NoError(t, err)
}
time.Sleep(100 * time.Millisecond)
stopInserting = true
time.Sleep(2 * time.Second)
Expand All @@ -160,6 +166,7 @@ func TestVStreamFailover(t *testing.T) {
break
}
}

qr := execVtgateQuery(t, vtgateConn, "product", "select count(*) from customer")
require.NotNil(t, qr)
// total number of row events found by the VStream API should match the rows inserted
Expand Down Expand Up @@ -368,6 +375,10 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID
return &ne
}

func TestVStreamFailover(t *testing.T) {
testVStreamWithFailover(t, true)
}

func TestVStreamStopOnReshardTrue(t *testing.T) {
ne := testVStreamStopOnReshardFlag(t, true, 1000)
require.Greater(t, ne.numJournalEvents, int64(0))
Expand All @@ -387,3 +398,11 @@ func TestVStreamStopOnReshardFalse(t *testing.T) {
require.NotZero(t, ne.numLessThan40Events)
require.NotZero(t, ne.numGreaterThan40Events)
}

func TestVStreamWithKeyspacesToWatch(t *testing.T) {
extraVTGateArgs = append(extraVTGateArgs, []string{
"-keyspaces_to_watch", "product",
}...)

testVStreamWithFailover(t, false)
}
44 changes: 40 additions & 4 deletions go/test/endtoend/vtgate/keyspace_watches/keyspace_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ var (
msg VARCHAR(64) NOT NULL,
PRIMARY KEY (id)
) Engine=InnoDB;`
vschemaDDL = "alter vschema create vindex test_vdx using hash"
vschemaDDLError = fmt.Sprintf("Error 1105: cannot perform Update on keyspaces/%s/VSchema as the topology server connection is read-only",
keyspaceUnshardedName)
)

// createConfig creates a config file in TmpDir in vtdataroot and writes the given data.
Expand All @@ -66,7 +69,7 @@ func createConfig(clusterInstance *cluster.LocalProcessCluster, name, data strin
return err
}

func createCluster() (*cluster.LocalProcessCluster, int) {
func createCluster(extraVTGateArgs []string) (*cluster.LocalProcessCluster, int) {
clusterInstance := cluster.NewCluster(cell, hostname)

// Start topo server
Expand Down Expand Up @@ -94,11 +97,16 @@ func createCluster() (*cluster.LocalProcessCluster, int) {
return nil, 1
}

clusterInstance.VtGateExtraArgs = []string{
vtGateArgs := []string{
"-mysql_auth_server_static_file", clusterInstance.TmpDirectory + "/" + mysqlAuthServerStatic,
"-keyspaces_to_watch", "ks1",
"-keyspaces_to_watch", keyspaceUnshardedName,
}

if extraVTGateArgs != nil {
vtGateArgs = append(vtGateArgs, extraVTGateArgs...)
}
clusterInstance.VtGateExtraArgs = vtGateArgs

// Start vtgate
if err := clusterInstance.StartVtgate(); err != nil {
return nil, 1
Expand All @@ -114,7 +122,7 @@ func createCluster() (*cluster.LocalProcessCluster, int) {
func TestRoutingWithKeyspacesToWatch(t *testing.T) {
defer cluster.PanicHandler(t)

clusterInstance, exitCode := createCluster()
clusterInstance, exitCode := createCluster(nil)
defer clusterInstance.Teardown()

if exitCode != 0 {
Expand All @@ -134,3 +142,31 @@ func TestRoutingWithKeyspacesToWatch(t *testing.T) {
_, err = db.Exec("select * from keyspaces_to_watch_test")
require.Nil(t, err)
}

func TestVSchemaDDLWithKeyspacesToWatch(t *testing.T) {
defer cluster.PanicHandler(t)

extraVTGateArgs := []string{
"-vschema_ddl_authorized_users", "%",
}
clusterInstance, exitCode := createCluster(extraVTGateArgs)
defer clusterInstance.Teardown()

if exitCode != 0 {
os.Exit(exitCode)
}

dsn := fmt.Sprintf(
"testuser1:testpassword1@tcp(%s:%v)/",
clusterInstance.Hostname,
clusterInstance.VtgateMySQLPort,
)
db, err := sql.Open("mysql", dsn)
require.Nil(t, err)
defer db.Close()

// The topo server must be read-only when using keyspaces_to_watch in order to prevent
// potentially corrupting the VSchema based on this vtgates limited view of the world
_, err = db.Exec(vschemaDDL)
require.EqualError(t, err, vschemaDDLError)
}
22 changes: 14 additions & 8 deletions go/vt/proto/vtrpc/vtrpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions go/vt/srvtopo/keyspace_filtering_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,13 @@ var (
// ErrNilUnderlyingServer is returned when attempting to create a new keyspace
// filtering server if a nil underlying server implementation is provided.
ErrNilUnderlyingServer = fmt.Errorf("unable to construct filtering server without an underlying server")

// ErrTopoServerNotAvailable is returned if a caller tries to access the
// topo.Server supporting this srvtopo.Server.
ErrTopoServerNotAvailable = fmt.Errorf("cannot access underlying topology server when keyspace filtering is enabled")
)

// NewKeyspaceFilteringServer constructs a new server based on the provided
// implementation that prevents the specified keyspaces from being exposed
// to consumers of the new Server.
//
// A filtering server will not allow access to the topo.Server to prevent
// A filtering server will only allow read-only access to the topo.Server to prevent
// updates that may corrupt the global VSchema keyspace.
func NewKeyspaceFilteringServer(underlying Server, selectedKeyspaces []string) (Server, error) {
if underlying == nil {
Expand All @@ -51,8 +47,13 @@ func NewKeyspaceFilteringServer(underlying Server, selectedKeyspaces []string) (
keyspaces[ks] = true
}

readOnlyServer, err := NewReadOnlyServer(underlying)
if err != nil {
return nil, err
}

return keyspaceFilteringServer{
server: underlying,
server: readOnlyServer,
selectKeyspaces: keyspaces,
}, nil
}
Expand All @@ -62,10 +63,9 @@ type keyspaceFilteringServer struct {
selectKeyspaces map[string]bool
}

// GetTopoServer returns an error; filtering srvtopo.Server consumers may not
// access the underlying topo.Server.
// GetTopoServer returns a read-only topo server
func (ksf keyspaceFilteringServer) GetTopoServer() (*topo.Server, error) {
return nil, ErrTopoServerNotAvailable
return ksf.server.GetTopoServer()
}

func (ksf keyspaceFilteringServer) GetSrvKeyspaceNames(
Expand Down
14 changes: 10 additions & 4 deletions go/vt/srvtopo/keyspace_filtering_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,17 @@ func TestFilteringServerHandlesNilUnderlying(t *testing.T) {
func TestFilteringServerReturnsUnderlyingServer(t *testing.T) {
_, _, f := newFiltering(nil)
got, gotErr := f.GetTopoServer()
if got != nil {
t.Errorf("Got non-nil topo.Server from FilteringServer")
if gotErr != nil {
t.Errorf("Got error getting topo.Server from FilteringServer")
}

readOnly, err := got.IsReadOnly()
if err != nil || !readOnly {
t.Errorf("Got read-write topo.Server from FilteringServer -- must be read-only")
}
if gotErr != ErrTopoServerNotAvailable {
t.Errorf("Unexpected error from GetTopoServer; wanted %v but got %v", ErrTopoServerNotAvailable, gotErr)
gotErr = got.CreateCellsAlias(stockCtx, "should_fail", &topodatapb.CellsAlias{Cells: []string{stockCell}})
if gotErr == nil {
t.Errorf("Were able to perform a write against the topo.Server from a FilteringServer -- it must be read-only")
}
}

Expand Down
Loading

0 comments on commit d080992

Please sign in to comment.