Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear existing keyspace schema in vtgate before [re]loading it #9437

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/test/endtoend/cluster/vtgate_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type VtgateProcess struct {
MySQLAuthServerImpl string
Directory string
VerifyURL string
VSchemaURL string
SysVarSetEnabled bool
PlannerVersion planbuilder.PlannerVersion
//Extra Args to be set before starting the vtgate process
Expand Down Expand Up @@ -251,6 +252,7 @@ func VtgateProcessInstance(
}

vtgate.VerifyURL = fmt.Sprintf("http://%s:%d/debug/vars", hostname, port)
vtgate.VSchemaURL = fmt.Sprintf("http://%s:%d/debug/vschema", hostname, port)

return vtgate
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package schematracker

import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand All @@ -38,6 +41,7 @@ var (
hostname = "localhost"
keyspaceName = "ks"
cell = "zone1"
signalInterval = 1
sqlSchema = `
create table vt_user (
id bigint,
Expand Down Expand Up @@ -86,7 +90,10 @@ func TestMain(m *testing.M) {

// restart the tablet so that the schema.Engine gets a chance to start with existing schema
tablet := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet()
tablet.VttabletProcess.ExtraArgs = []string{"-queryserver-config-schema-change-signal"}
tablet.VttabletProcess.ExtraArgs = []string{
"-queryserver-config-schema-change-signal",
fmt.Sprintf("-queryserver-config-schema-change-signal-interval=%d", signalInterval),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: easier way to give this is
[]string{...., "-queryserver-config-schema-change-signal-interval", strconv.Itoa(signalInterval), .... }

}
if err := tablet.RestartOnlyTablet(); err != nil {
return 1
}
Expand Down Expand Up @@ -118,6 +125,35 @@ func TestVSchemaTrackerInit(t *testing.T) {
assert.Equal(t, want, got)
}

// TestVSchemaTrackerKeyspaceReInit tests that the vschema tracker
// properly handles primary tablet restarts -- meaning that we maintain
// the exact same vschema state as before the restart.
func TestVSchemaTrackerKeyspaceReInit(t *testing.T) {
defer cluster.PanicHandler(t)

primaryTablet := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet()

// get the vschema prior to the restarts
var originalResults interface{}
readVSchema(t, &clusterInstance.VtgateProcess, &originalResults)
assert.NotNil(t, originalResults)

// restart the primary tablet so that the vschema gets reloaded for the keyspace
for i := 0; i < 5; i++ {
err := primaryTablet.VttabletProcess.TearDownWithTimeout(30 * time.Second)
require.NoError(t, err)
err = primaryTablet.VttabletProcess.Setup()
require.NoError(t, err)
err = clusterInstance.WaitForTabletsToHealthyInVtgate()
require.NoError(t, err)
time.Sleep(time.Duration(signalInterval*2) * time.Second)
var newResults interface{}
readVSchema(t, &clusterInstance.VtgateProcess, &newResults)
assert.Equal(t, originalResults, newResults)
newResults = nil
}
}

func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
Expand All @@ -126,3 +162,11 @@ func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
}
return qr
}

func readVSchema(t *testing.T, vtgate *cluster.VtgateProcess, results *interface{}) {
httpClient := &http.Client{Timeout: 5 * time.Second}
resp, err := httpClient.Get(vtgate.VSchemaURL)
require.Nil(t, err)
assert.Equal(t, 200, resp.StatusCode)
json.NewDecoder(resp.Body).Decode(results)
}
18 changes: 16 additions & 2 deletions go/vt/vtgate/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,14 @@ func (t *Tracker) LoadKeyspace(conn queryservice.QueryService, target *querypb.T
}
t.mu.Lock()
defer t.mu.Unlock()
// We must clear out any previous schema before loading it here as this is called
// whenever a shard's primary tablet starts and sends the initial signal. Without
// clearing out the previous schema we can end up with duplicate entries when the
// tablet is simply restarted or potentially when we elect a new primary.
t.clearKeyspaceTables(target.Keyspace)
t.updateTables(target.Keyspace, res)
t.tracked[target.Keyspace].setLoaded(true)
log.Infof("finished loading schema for keyspace %s. Found %d tables", target.Keyspace, len(res.Rows))
log.Infof("finished loading schema for keyspace %s. Found %d columns in total across the tables", target.Keyspace, len(res.Rows))
return nil
}

Expand Down Expand Up @@ -130,7 +135,7 @@ func (t *Tracker) newUpdateController() *updateController {
func (t *Tracker) initKeyspace(th *discovery.TabletHealth) error {
err := t.LoadKeyspace(th.Conn, th.Target)
if err != nil {
log.Warningf("Unable to add keyspace to tracker: %v", err)
log.Warningf("Unable to add the %s keyspace to the schema tracker: %v", th.Target.Keyspace, err)
return err
}
return nil
Expand Down Expand Up @@ -255,3 +260,12 @@ func (tm *tableMap) delete(ks, tbl string) {
}
delete(m, tbl)
}

// This empties out any previous schema for for all tables in a keyspace.
// You should call this before initializing/loading a keyspace of the same
// name in the cache.
func (t *Tracker) clearKeyspaceTables(ks string) {
if t.tables != nil && t.tables.m != nil {
delete(t.tables.m, ks)
}
}
10 changes: 5 additions & 5 deletions go/vt/vtgate/schema/update_controller_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestMultipleUpdatesFromDifferentShards(t *testing.T) {
signal := func() {
signalNb++
}
kUpdate := updateController{
updateCont := updateController{
update: update,
signal: signal,
consumeDelay: 5 * time.Millisecond,
Expand Down Expand Up @@ -159,13 +159,13 @@ func TestMultipleUpdatesFromDifferentShards(t *testing.T) {
if test.delay > 0 {
time.Sleep(test.delay)
}
kUpdate.add(d)
updateCont.add(d)
}

for {
kUpdate.mu.Lock()
done := kUpdate.queue == nil
kUpdate.mu.Unlock()
updateCont.mu.Lock()
done := updateCont.queue == nil
updateCont.mu.Unlock()
if done {
break
}
Expand Down