Skip to content

Commit

Permalink
Merge pull request #6435 from planetscale/rn-schema-tracking-initial-…
Browse files Browse the repository at this point in the history
…schema

Schema tracking: initial schema insert
  • Loading branch information
sougou authored Jul 16, 2020
2 parents 1d518f2 + febb163 commit 6032234
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 104 deletions.
30 changes: 22 additions & 8 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type DB struct {
// connections tracks all open connections.
// The key for the map is the value of mysql.Conn.ConnectionID.
connections map[uint32]*mysql.Conn

// queryPatternUserCallback stores optional callbacks when a query with a pattern is called
queryPatternUserCallback map[*regexp.Regexp]func(string)
}

// QueryHandler is the interface used by the DB to simulate executed queries
Expand Down Expand Up @@ -157,13 +160,14 @@ func New(t *testing.T) *DB {

// Create our DB.
db := &DB{
t: t,
socketFile: socketFile,
name: "fakesqldb",
data: make(map[string]*ExpectedResult),
rejectedData: make(map[string]error),
queryCalled: make(map[string]int),
connections: make(map[uint32]*mysql.Conn),
t: t,
socketFile: socketFile,
name: "fakesqldb",
data: make(map[string]*ExpectedResult),
rejectedData: make(map[string]error),
queryCalled: make(map[string]int),
connections: make(map[uint32]*mysql.Conn),
queryPatternUserCallback: make(map[*regexp.Regexp]func(string)),
}

db.Handler = db
Expand Down Expand Up @@ -344,7 +348,6 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
defer db.mu.Unlock()
db.queryCalled[key]++
db.querylog = append(db.querylog, key)

// Check if we should close the connection and provoke errno 2013.
if db.shouldClose {
c.Close()
Expand Down Expand Up @@ -384,6 +387,10 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
// Check query patterns from AddQueryPattern().
for _, pat := range db.patternData {
if pat.expr.MatchString(query) {
userCallback, ok := db.queryPatternUserCallback[pat.expr]
if ok {
userCallback(query)
}
return callback(pat.result)
}
}
Expand Down Expand Up @@ -500,6 +507,13 @@ func (db *DB) AddQueryPattern(queryPattern string, expectedResult *sqltypes.Resu
db.patternData = append(db.patternData, exprResult{expr, &result})
}

// AddQueryPatternWithCallback is similar to AddQueryPattern: in addition it calls the provided callback function
// The callback can be used to set user counters/variables for testing specific usecases
func (db *DB) AddQueryPatternWithCallback(queryPattern string, expectedResult *sqltypes.Result, callback func(string)) {
db.AddQueryPattern(queryPattern, expectedResult)
db.queryPatternUserCallback[db.patternData[len(db.patternData)-1].expr] = callback
}

// DeleteQuery deletes query from the fake DB.
func (db *DB) DeleteQuery(query string) {
db.mu.Lock()
Expand Down
12 changes: 10 additions & 2 deletions go/test/endtoend/cluster/vtctlclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@ type VtctlClientProcess struct {

// InitShardMaster executes vtctlclient command to make one of tablet as master
func (vtctlclient *VtctlClientProcess) InitShardMaster(Keyspace string, Shard string, Cell string, TabletUID int) (err error) {
return vtctlclient.ExecuteCommand(
output, err := vtctlclient.ExecuteCommandWithOutput(
"InitShardMaster",
"-force", "-wait_replicas_timeout", "31s",
fmt.Sprintf("%s/%s", Keyspace, Shard),
fmt.Sprintf("%s-%d", Cell, TabletUID))
if err != nil {
log.Errorf("error in InitShardMaster output %s, err %s", output, err.Error())
}
return err
}

// ApplySchema applies SQL schema to the keyspace
Expand Down Expand Up @@ -73,7 +77,11 @@ func (vtctlclient *VtctlClientProcess) ExecuteCommand(args ...string) (err error
pArgs...,
)
log.Infof("Executing vtctlclient with command: %v", strings.Join(tmpProcess.Args, " "))
return tmpProcess.Run()
output, err := tmpProcess.Output()
if err != nil {
log.Errorf("Error executing %s: output %s, err %v", strings.Join(tmpProcess.Args, " "), output, err)
}
return err
}

// ExecuteCommandWithOutput executes any vtctlclient command and returns output
Expand Down
88 changes: 0 additions & 88 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,94 +320,6 @@ func TestSchemaVersioning(t *testing.T) {
log.Info("=== END OF TEST")
}

func TestSchemaVersioningLongDDL(t *testing.T) {
// Let's disable the already running tracker to prevent it from
// picking events from the previous test, and then re-enable it at the end.
tsv := framework.Server
tsv.EnableHistorian(false)
tsv.SetTracking(false)
defer tsv.EnableHistorian(true)
defer tsv.SetTracking(true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tsv.EnableHistorian(true)
tsv.SetTracking(true)

target := &querypb.Target{
Keyspace: "vttest",
Shard: "0",
TabletType: tabletpb.TabletType_MASTER,
Cell: "",
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*/",
}},
}
longDDL := "create table vitess_version ("
for i := 0; i < 100; i++ {
col := fmt.Sprintf("id%d_%s int", i, strings.Repeat("0", 10))
if i != 99 {
col += ", "
}
longDDL += col
}
longDDL += ")"

var cases = []test{
{
query: longDDL,
output: append(append([]string{
`gtid`, //gtid+other => vstream current pos
`other`,
`gtid`, //gtid+ddl => actual query
fmt.Sprintf(`type:DDL ddl:"%s" `, longDDL)},
getSchemaVersionTableCreationEvents()...),
`version`,
`gtid`,
),
},
}
eventCh := make(chan []*binlogdatapb.VEvent)
var startPos string
send := func(events []*binlogdatapb.VEvent) error {
var evs []*binlogdatapb.VEvent
for _, event := range events {
if event.Type == binlogdatapb.VEventType_GTID {
if startPos == "" {
startPos = event.Gtid
}
}
if event.Type == binlogdatapb.VEventType_HEARTBEAT {
continue
}
log.Infof("Received event %v", event)
evs = append(evs, event)
}
select {
case eventCh <- evs:
case <-ctx.Done():
return nil
}
return nil
}
go func() {
defer close(eventCh)
if err := tsv.VStream(ctx, target, "current", nil, filter, send); err != nil {
fmt.Printf("Error in tsv.VStream: %v", err)
t.Error(err)
}
}()
runCases(ctx, t, cases, eventCh)

cancel()

client := framework.NewClient()
client.Execute("drop table vitess_version", nil)
client.Execute("drop table _vt.schema_version", nil)
}

func runCases(ctx context.Context, t *testing.T, tests []test, eventCh chan []*binlogdatapb.VEvent) {
t.Helper()
client := framework.NewClient()
Expand Down
64 changes: 63 additions & 1 deletion go/vt/vttablet/tabletserver/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync"
"time"

"vitess.io/vitess/go/mysql"

"vitess.io/vitess/go/vt/sqlparser"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -97,6 +99,7 @@ func (tr *Tracker) Open() {
tr.cancel = cancel
tr.wg.Add(1)
log.Info("Schema tracker enabled.")

go tr.process(ctx)
}

Expand Down Expand Up @@ -130,6 +133,10 @@ func (tr *Tracker) Enable(enabled bool) {
func (tr *Tracker) process(ctx context.Context) {
defer tr.env.LogError()
defer tr.wg.Done()
if err := tr.possiblyInsertInitialSchema(ctx); err != nil {
log.Errorf("possiblyInsertInitialSchema eror: %v", err)
return
}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand Down Expand Up @@ -163,14 +170,69 @@ func (tr *Tracker) process(ctx context.Context) {
}
}

func (tr *Tracker) currentPosition(ctx context.Context) (mysql.Position, error) {
conn, err := tr.engine.cp.Connect(ctx)
if err != nil {
return mysql.Position{}, err
}
defer conn.Close()
return conn.MasterPosition()
}

func (tr *Tracker) isSchemaVersionTableEmpty(ctx context.Context) (bool, error) {
conn, err := tr.engine.GetConnection(ctx)
if err != nil {
return false, err
}
defer conn.Recycle()
result, err := withDDL.Exec(ctx, "select id from _vt.schema_version limit 1", conn.Exec)
if err != nil {
return false, err
}
if len(result.Rows) == 0 {
return true, nil
}
return false, nil
}

// possiblyInsertInitialSchema stores the latest schema when a tracker starts and the schema_version table is empty
// this enables the right schema to be available between the time the tracker starts first and the first DDL is applied
func (tr *Tracker) possiblyInsertInitialSchema(ctx context.Context) error {
var err error
needsWarming, err := tr.isSchemaVersionTableEmpty(ctx)
if err != nil {
return err
}
if !needsWarming { // _vt.schema_version is not empty, nothing to do here
return nil
}
if err = tr.engine.Reload(ctx); err != nil {
return err
}

timestamp := time.Now().UnixNano() / 1e9
ddl := ""
pos, err := tr.currentPosition(ctx)
if err != nil {
return err
}
gtid := mysql.EncodePosition(pos)
log.Infof("Saving initial schema for gtid %s", gtid)

return tr.saveCurrentSchemaToDb(ctx, gtid, ddl, timestamp)
}

func (tr *Tracker) schemaUpdated(gtid string, ddl string, timestamp int64) error {
log.Infof("Processing schemaUpdated event for gtid %s, ddl %s", gtid, ddl)
if gtid == "" || ddl == "" {
return fmt.Errorf("got invalid gtid or ddl in schemaUpdated")
}
ctx := context.Background()

// Engine will have reloaded the schema because vstream will reload it on a DDL
return tr.saveCurrentSchemaToDb(ctx, gtid, ddl, timestamp)
}

func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, timestamp int64) error {
tables := tr.engine.GetSchema()
dbSchema := &binlogdatapb.MinimalSchema{
Tables: []*binlogdatapb.MinimalTable{},
Expand Down
63 changes: 58 additions & 5 deletions go/vt/vttablet/tabletserver/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,35 @@ package schema
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"golang.org/x/net/context"
"vitess.io/vitess/go/sqltypes"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

func TestTracker(t *testing.T) {
initialSchemaInserted := false
se, db, cancel := getTestSchemaEngine(t)
defer cancel()

gtid1 := "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10"
ddl1 := "create table tracker_test (id int)"
query := "CREATE TABLE IF NOT EXISTS _vt.schema_version.*"
db.AddQueryPattern(query, &sqltypes.Result{})

db.AddQueryPattern("insert into _vt.schema_version.*", &sqltypes.Result{})

db.AddQueryPattern("insert into _vt.schema_version.*1-10.*", &sqltypes.Result{})
db.AddQueryPatternWithCallback("insert into _vt.schema_version.*1-3.*", &sqltypes.Result{}, func(query string) {
initialSchemaInserted = true
})
// simulates empty schema_version table, so initial schema should be inserted
db.AddQuery("select id from _vt.schema_version limit 1", &sqltypes.Result{Rows: [][]sqltypes.Value{}})
// called to get current position
db.AddQuery("SELECT @@GLOBAL.gtid_executed", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"",
"varchar"),
"7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-3",
))
vs := &fakeVstreamer{
done: make(chan struct{}),
events: [][]*binlogdatapb.VEvent{{
Expand Down Expand Up @@ -74,7 +85,49 @@ func TestTracker(t *testing.T) {
tracker.Close()
// Two of those events should have caused an error.
final := env.Stats().ErrorCounters.Counts()["INTERNAL"]
assert.Equal(t, initial+2, final)
require.Equal(t, initial+2, final)
require.True(t, initialSchemaInserted)
}

func TestTrackerShouldNotInsertInitialSchema(t *testing.T) {
initialSchemaInserted := false
se, db, cancel := getTestSchemaEngine(t)
gtid1 := "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10"

defer cancel()
// simulates existing rows in schema_version, so initial schema should not be inserted
db.AddQuery("select id from _vt.schema_version limit 1", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id",
"int"),
"1",
))
// called to get current position
db.AddQuery("SELECT @@GLOBAL.gtid_executed", sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"",
"varchar"),
"7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-3",
))
db.AddQueryPatternWithCallback("insert into _vt.schema_version.*1-3.*", &sqltypes.Result{}, func(query string) {
initialSchemaInserted = true
})
vs := &fakeVstreamer{
done: make(chan struct{}),
events: [][]*binlogdatapb.VEvent{{
{
Type: binlogdatapb.VEventType_GTID,
Gtid: gtid1,
},
}},
}
config := se.env.Config()
config.TrackSchemaVersions = true
env := tabletenv.NewEnv(config, "TrackerTest")
tracker := NewTracker(env, vs, se)
tracker.Open()
<-vs.done
cancel()
tracker.Close()
require.False(t, initialSchemaInserted)
}

var _ VStreamer = (*fakeVstreamer)(nil)
Expand Down

0 comments on commit 6032234

Please sign in to comment.