Skip to content

Commit

Permalink
watch vschema in drop database to confirm that the keyspace is no lon…
Browse files Browse the repository at this point in the history
…ger available to be served

Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Mar 11, 2021
1 parent e762d58 commit 6a7f295
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 50 deletions.
96 changes: 65 additions & 31 deletions go/test/endtoend/vtgate/createdb_plugin/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -90,37 +92,47 @@ func TestDBDDLPlugin(t *testing.T) {
require.NoError(t, err)
defer conn.Close()

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
qr := exec(t, conn, `create database aaa`)
require.EqualValues(t, 1, qr.RowsAffected)
}()
time.Sleep(300 * time.Millisecond)
start(t, "aaa")

// wait until the create database query has returned
wg.Wait()

exec(t, conn, `use aaa`)
exec(t, conn, `create table t (id bigint primary key)`)
exec(t, conn, `insert into t(id) values (1),(2),(3),(4),(5)`)
assertMatches(t, conn, "select count(*) from t", `[[INT64(5)]]`)

wg.Add(1)
go func() {
defer wg.Done()
_ = exec(t, conn, `drop database aaa`)
}()
time.Sleep(300 * time.Millisecond)
shutdown(t, "aaa")

// wait until the drop database query has returned
wg.Wait()

_, err = conn.ExecuteFetch(`select count(*) from t`, 1000, true)
require.Error(t, err)
createAndDrop := func(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
qr := exec(t, conn, `create database aaa`)
require.EqualValues(t, 1, qr.RowsAffected)
}()
time.Sleep(300 * time.Millisecond)
start(t, "aaa")

// wait until the create database query has returned
wg.Wait()

exec(t, conn, `use aaa`)
exec(t, conn, `create table t (id bigint primary key)`)
exec(t, conn, `insert into t(id) values (1),(2),(3),(4),(5)`)
assertMatches(t, conn, "select count(*) from t", `[[INT64(5)]]`)

wg.Add(1)
go func() {
defer wg.Done()
_ = exec(t, conn, `drop database aaa`)
}()
time.Sleep(300 * time.Millisecond)
shutdown(t, "aaa")

// wait until the drop database query has returned
wg.Wait()

_, err = conn.ExecuteFetch(`select count(*) from t`, 1000, true)
require.Error(t, err)
}
t.Run("first try", func(t *testing.T) {
createAndDrop(t)
})
if !t.Failed() {
t.Run("second try", func(t *testing.T) {
createAndDrop(t)
})
}
}

func start(t *testing.T, ksName string) {
Expand All @@ -133,8 +145,30 @@ func start(t *testing.T, ksName string) {
}

func shutdown(t *testing.T, ksName string) {
for _, ks := range clusterInstance.Keyspaces {
if ks.Name != ksName {
continue
}
for _, shard := range ks.Shards {
for _, tablet := range shard.Vttablets {
if tablet.MysqlctlProcess.TabletUID > 0 {
_, err := tablet.MysqlctlProcess.StopProcess()
assert.NoError(t, err)
}
if tablet.MysqlctldProcess.TabletUID > 0 {
err := tablet.MysqlctldProcess.Stop()
assert.NoError(t, err)
}
_ = tablet.VttabletProcess.TearDown()
}
}
}

require.NoError(t,
clusterInstance.VtctlclientProcess.ExecuteCommand("DeleteKeyspace", "-recursive", ksName))

require.NoError(t,
clusterInstance.VtctlclientProcess.ExecuteCommand("RebuildVSchemaGraph"))
}

func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
Expand Down
15 changes: 5 additions & 10 deletions go/vt/vtgate/engine/dbddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (c *DBDDL) createDatabase(vcursor VCursor, plugin DBDDLPlugin) (*sqltypes.R
}
select {
case <-ctx.Done(): //context cancelled
return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "could not validate created database")
return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "could not validate create database: destination not resolved")
case <-time.After(500 * time.Millisecond): //timeout
}
}
Expand All @@ -150,7 +150,7 @@ func (c *DBDDL) createDatabase(vcursor VCursor, plugin DBDDLPlugin) (*sqltypes.R
noErr = false
select {
case <-ctx.Done(): //context cancelled
return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "could not validate created database")
return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "could not validate create database: tablets not healthy")
case <-time.After(500 * time.Millisecond): //timeout
}
break
Expand All @@ -169,19 +169,14 @@ func (c *DBDDL) dropDatabase(vcursor VCursor, plugin DBDDLPlugin) (*sqltypes.Res
if err != nil {
return nil, err
}
for {
// loop until we do not find the keyspace to resolve.
_, _, err = vcursor.ResolveDestinations(c.name, nil, []key.Destination{})
if err != nil && strings.Contains(err.Error(), "node doesn't exist") {
break
}

for vcursor.KeyspaceAvailable(c.name) {
select {
case <-ctx.Done(): //context cancelled
return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "could not validate drop database")
return nil, vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "could not validate drop database: keyspace still available in vschema")
case <-time.After(500 * time.Millisecond): //timeout
}
}

return &sqltypes.Result{StatusFlags: sqltypes.ServerStatusDbDropped}, nil
}

Expand Down
19 changes: 10 additions & 9 deletions go/vt/vtgate/engine/dbddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -57,7 +59,7 @@ func TestDBDDLDropExecute(t *testing.T) {

primitive := &DBDDL{name: "ks"}

vc := &loggingVCursor{dbDDLPlugin: pluginName}
vc := &loggingVCursor{dbDDLPlugin: pluginName, ksAvailable: false}

_, err := primitive.Execute(vc, nil, false)
require.NoError(t, err)
Expand All @@ -70,14 +72,13 @@ func TestDBDDLTimeout(t *testing.T) {
plugin := &dbddlTestFake{sleep: 2}
DBDDLRegister(pluginName, plugin)

primitive := &DBDDL{
name: "ks",
create: true,
queryTimeout: 100,
}

primitive := &DBDDL{name: "ks", create: true, queryTimeout: 100}
vc := &loggingVCursor{dbDDLPlugin: pluginName, shardErr: fmt.Errorf("db not available")}

_, err := primitive.Execute(vc, nil, false)
require.EqualError(t, err, "could not validate created database")
assert.EqualError(t, err, "could not validate create database: destination not resolved")

primitive = &DBDDL{name: "ks", queryTimeout: 100}
vc = &loggingVCursor{dbDDLPlugin: pluginName, ksAvailable: true}
_, err = primitive.Execute(vc, nil, false)
assert.EqualError(t, err, "could not validate drop database: keyspace still available in vschema")
}
9 changes: 9 additions & 0 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type noopVCursor struct {
ctx context.Context
}

func (t *noopVCursor) KeyspaceAvailable(ks string) bool {
panic("implement me")
}

func (t *noopVCursor) SetDDLStrategy(strategy string) {
panic("implement me")
}
Expand Down Expand Up @@ -270,12 +274,17 @@ type loggingVCursor struct {

tableRoutes tableRoutes
dbDDLPlugin string
ksAvailable bool
}

type tableRoutes struct {
tbl *vindexes.Table
}

func (f *loggingVCursor) KeyspaceAvailable(ks string) bool {
return f.ksAvailable
}

func (f *loggingVCursor) SetFoundRows(u uint64) {
panic("implement me")
}
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ type (

// GetDBDDLPlugin gets the configured plugin for DROP/CREATE DATABASE
GetDBDDLPluginName() string

// KeyspaceAvailable returns true when a keyspace is visible from vtgate
KeyspaceAvailable(ks string) bool
}

//SessionActions gives primitives ability to interact with the session state
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type iExecute interface {

// TODO: remove when resolver is gone
ParseDestinationTarget(targetString string) (string, topodatapb.TabletType, key.Destination, error)
VSchema() *vindexes.VSchema
}

//VSchemaOperator is an interface to Vschema Operations
Expand Down Expand Up @@ -694,10 +695,17 @@ func (vc *vcursorImpl) HasCreatedTempTable() {
vc.safeSession.GetOrCreateOptions().HasCreatedTempTables = true
}

// GetDBDDLPluginName implements the VCursor interface
func (vc *vcursorImpl) GetDBDDLPluginName() string {
return *dbDDLPlugin
}

// KeyspaceAvailable implements the VCursor interface
func (vc *vcursorImpl) KeyspaceAvailable(ks string) bool {
_, exists := vc.executor.VSchema().Keyspaces[ks]
return exists
}

// ParseDestinationTarget parses destination target string and sets default keyspace if possible.
func parseDestinationTarget(targetString string, vschema *vindexes.VSchema) (string, topodatapb.TabletType, key.Destination, error) {
destKeyspace, destTabletType, dest, err := topoprotopb.ParseDestination(targetString, defaultTabletType)
Expand Down

0 comments on commit 6a7f295

Please sign in to comment.