diff --git a/go/test/endtoend/cluster/vtgate_process.go b/go/test/endtoend/cluster/vtgate_process.go index 39ddafb7880..3129f05bf58 100644 --- a/go/test/endtoend/cluster/vtgate_process.go +++ b/go/test/endtoend/cluster/vtgate_process.go @@ -55,6 +55,7 @@ type VtgateProcess struct { MySQLAuthServerImpl string Directory string VerifyURL string + VSchemaURL string SysVarSetEnabled bool PlannerVersion planbuilder.PlannerVersion //Extra Args to be set before starting the vtgate process @@ -251,6 +252,7 @@ func VtgateProcessInstance( } vtgate.VerifyURL = fmt.Sprintf("http://%s:%d/debug/vars", hostname, port) + vtgate.VSchemaURL = fmt.Sprintf("http://%s:%d/debug/vschema", hostname, port) return vtgate } diff --git a/go/test/endtoend/vtgate/schematracker/restarttablet/schema_restart_test.go b/go/test/endtoend/vtgate/schematracker/restarttablet/schema_restart_test.go index 44fa98320e5..0616552a73f 100644 --- a/go/test/endtoend/vtgate/schematracker/restarttablet/schema_restart_test.go +++ b/go/test/endtoend/vtgate/schematracker/restarttablet/schema_restart_test.go @@ -18,10 +18,13 @@ package schematracker import ( "context" + "encoding/json" "flag" "fmt" + "net/http" "os" "testing" + "time" "github.com/stretchr/testify/require" @@ -38,6 +41,7 @@ var ( hostname = "localhost" keyspaceName = "ks" cell = "zone1" + signalInterval = 1 sqlSchema = ` create table vt_user ( id bigint, @@ -86,7 +90,10 @@ func TestMain(m *testing.M) { // restart the tablet so that the schema.Engine gets a chance to start with existing schema tablet := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet() - tablet.VttabletProcess.ExtraArgs = []string{"-queryserver-config-schema-change-signal"} + tablet.VttabletProcess.ExtraArgs = []string{ + "-queryserver-config-schema-change-signal", + fmt.Sprintf("-queryserver-config-schema-change-signal-interval=%d", signalInterval), + } if err := tablet.RestartOnlyTablet(); err != nil { return 1 } @@ -118,6 +125,35 @@ func TestVSchemaTrackerInit(t *testing.T) { assert.Equal(t, want, got) } +// TestVSchemaTrackerKeyspaceReInit tests that the vschema tracker +// properly handles primary tablet restarts -- meaning that we maintain +// the exact same vschema state as before the restart. +func TestVSchemaTrackerKeyspaceReInit(t *testing.T) { + defer cluster.PanicHandler(t) + + primaryTablet := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet() + + // get the vschema prior to the restarts + var originalResults interface{} + readVSchema(t, &clusterInstance.VtgateProcess, &originalResults) + assert.NotNil(t, originalResults) + + // restart the primary tablet so that the vschema gets reloaded for the keyspace + for i := 0; i < 5; i++ { + err := primaryTablet.VttabletProcess.TearDownWithTimeout(30 * time.Second) + require.NoError(t, err) + err = primaryTablet.VttabletProcess.Setup() + require.NoError(t, err) + err = clusterInstance.WaitForTabletsToHealthyInVtgate() + require.NoError(t, err) + time.Sleep(time.Duration(signalInterval*2) * time.Second) + var newResults interface{} + readVSchema(t, &clusterInstance.VtgateProcess, &newResults) + assert.Equal(t, originalResults, newResults) + newResults = nil + } +} + func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { t.Helper() qr, err := conn.ExecuteFetch(query, 1000, true) @@ -126,3 +162,11 @@ func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { } return qr } + +func readVSchema(t *testing.T, vtgate *cluster.VtgateProcess, results *interface{}) { + httpClient := &http.Client{Timeout: 5 * time.Second} + resp, err := httpClient.Get(vtgate.VSchemaURL) + require.Nil(t, err) + assert.Equal(t, 200, resp.StatusCode) + json.NewDecoder(resp.Body).Decode(results) +} diff --git a/go/vt/vtgate/schema/tracker.go b/go/vt/vtgate/schema/tracker.go index cf221a159e8..d6bf2830496 100644 --- a/go/vt/vtgate/schema/tracker.go +++ b/go/vt/vtgate/schema/tracker.go @@ -84,9 +84,14 @@ func (t *Tracker) LoadKeyspace(conn queryservice.QueryService, target *querypb.T } t.mu.Lock() defer t.mu.Unlock() + // We must clear out any previous schema before loading it here as this is called + // whenever a shard's primary tablet starts and sends the initial signal. Without + // clearing out the previous schema we can end up with duplicate entries when the + // tablet is simply restarted or potentially when we elect a new primary. + t.clearKeyspaceTables(target.Keyspace) t.updateTables(target.Keyspace, res) t.tracked[target.Keyspace].setLoaded(true) - log.Infof("finished loading schema for keyspace %s. Found %d tables", target.Keyspace, len(res.Rows)) + log.Infof("finished loading schema for keyspace %s. Found %d columns in total across the tables", target.Keyspace, len(res.Rows)) return nil } @@ -130,7 +135,7 @@ func (t *Tracker) newUpdateController() *updateController { func (t *Tracker) initKeyspace(th *discovery.TabletHealth) error { err := t.LoadKeyspace(th.Conn, th.Target) if err != nil { - log.Warningf("Unable to add keyspace to tracker: %v", err) + log.Warningf("Unable to add the %s keyspace to the schema tracker: %v", th.Target.Keyspace, err) return err } return nil @@ -255,3 +260,12 @@ func (tm *tableMap) delete(ks, tbl string) { } delete(m, tbl) } + +// This empties out any previous schema for for all tables in a keyspace. +// You should call this before initializing/loading a keyspace of the same +// name in the cache. +func (t *Tracker) clearKeyspaceTables(ks string) { + if t.tables != nil && t.tables.m != nil { + delete(t.tables.m, ks) + } +} diff --git a/go/vt/vtgate/schema/uptate_controller.go b/go/vt/vtgate/schema/update_controller.go similarity index 100% rename from go/vt/vtgate/schema/uptate_controller.go rename to go/vt/vtgate/schema/update_controller.go diff --git a/go/vt/vtgate/schema/update_controller_flaky_test.go b/go/vt/vtgate/schema/update_controller_flaky_test.go index 2bb98754675..5a2d5df9cc6 100644 --- a/go/vt/vtgate/schema/update_controller_flaky_test.go +++ b/go/vt/vtgate/schema/update_controller_flaky_test.go @@ -124,7 +124,7 @@ func TestMultipleUpdatesFromDifferentShards(t *testing.T) { signal := func() { signalNb++ } - kUpdate := updateController{ + updateCont := updateController{ update: update, signal: signal, consumeDelay: 5 * time.Millisecond, @@ -159,13 +159,13 @@ func TestMultipleUpdatesFromDifferentShards(t *testing.T) { if test.delay > 0 { time.Sleep(test.delay) } - kUpdate.add(d) + updateCont.add(d) } for { - kUpdate.mu.Lock() - done := kUpdate.queue == nil - kUpdate.mu.Unlock() + updateCont.mu.Lock() + done := updateCont.queue == nil + updateCont.mu.Unlock() if done { break }