From 648f93c6fed96a74a3cbb9b6f8e0eefcfdc90e6d Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 10 Jul 2020 09:25:58 +0200 Subject: [PATCH 1/6] Schema tracking: initial schema insert Signed-off-by: Rohit Nayak --- go/mysql/fakesqldb/server.go | 30 ++++++--- go/vt/vttablet/tabletserver/schema/tracker.go | 66 ++++++++++++++++++- .../tabletserver/schema/tracker_test.go | 59 +++++++++++++++-- 3 files changed, 141 insertions(+), 14 deletions(-) diff --git a/go/mysql/fakesqldb/server.go b/go/mysql/fakesqldb/server.go index cfd524334f3..71bf08f06c8 100644 --- a/go/mysql/fakesqldb/server.go +++ b/go/mysql/fakesqldb/server.go @@ -116,6 +116,9 @@ type DB struct { // connections tracks all open connections. // The key for the map is the value of mysql.Conn.ConnectionID. connections map[uint32]*mysql.Conn + + // queryPatternUserCallback stores optional callbacks when a query with a pattern is called + queryPatternUserCallback map[*regexp.Regexp]func(string) } // QueryHandler is the interface used by the DB to simulate executed queries @@ -157,13 +160,14 @@ func New(t *testing.T) *DB { // Create our DB. db := &DB{ - t: t, - socketFile: socketFile, - name: "fakesqldb", - data: make(map[string]*ExpectedResult), - rejectedData: make(map[string]error), - queryCalled: make(map[string]int), - connections: make(map[uint32]*mysql.Conn), + t: t, + socketFile: socketFile, + name: "fakesqldb", + data: make(map[string]*ExpectedResult), + rejectedData: make(map[string]error), + queryCalled: make(map[string]int), + connections: make(map[uint32]*mysql.Conn), + queryPatternUserCallback: make(map[*regexp.Regexp]func(string)), } db.Handler = db @@ -344,7 +348,6 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R defer db.mu.Unlock() db.queryCalled[key]++ db.querylog = append(db.querylog, key) - // Check if we should close the connection and provoke errno 2013. if db.shouldClose { c.Close() @@ -384,6 +387,10 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R // Check query patterns from AddQueryPattern(). for _, pat := range db.patternData { if pat.expr.MatchString(query) { + userCallback, ok := db.queryPatternUserCallback[pat.expr] + if ok { + userCallback(query) + } return callback(pat.result) } } @@ -500,6 +507,13 @@ func (db *DB) AddQueryPattern(queryPattern string, expectedResult *sqltypes.Resu db.patternData = append(db.patternData, exprResult{expr, &result}) } +// AddQueryPatternWithCallback is similar to AddQueryPattern: in addition it calls the provided callback function +// The callback can be used to set user counters/variables for testing specific usecases +func (db *DB) AddQueryPatternWithCallback(queryPattern string, expectedResult *sqltypes.Result, callback func(string)) { + db.AddQueryPattern(queryPattern, expectedResult) + db.queryPatternUserCallback[db.patternData[len(db.patternData)-1].expr] = callback +} + // DeleteQuery deletes query from the fake DB. func (db *DB) DeleteQuery(query string) { db.mu.Lock() diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index ea9abaf0221..604be6f1723 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/sqlparser" "github.com/gogo/protobuf/proto" @@ -97,6 +99,10 @@ func (tr *Tracker) Open() { tr.cancel = cancel tr.wg.Add(1) log.Info("Schema tracker enabled.") + if err := tr.possiblyInsertInitialSchema(ctx); err != nil { + log.Errorf("possiblyInsertInitialSchema eror: %v", err) + return + } go tr.process(ctx) } @@ -163,14 +169,72 @@ func (tr *Tracker) process(ctx context.Context) { } } +func (tr *Tracker) currentPosition(ctx context.Context) (mysql.Position, error) { + conn, err := tr.engine.cp.Connect(ctx) + if err != nil { + return mysql.Position{}, err + } + defer conn.Close() + return conn.MasterPosition() +} + +func (tr *Tracker) isSchemaVersionTableEmpty(ctx context.Context) (bool, error) { + conn, err := tr.engine.GetConnection(ctx) + if err != nil { + return false, err + } + defer conn.Recycle() + result, err := withDDL.Exec(ctx, "select id from _vt.schema_version limit 1", conn.Exec) + if err != nil { + return false, err + } + if result == nil { + return false, fmt.Errorf("error querying _vt.schema_version") + } + if result.RowsAffected == 0 { + return true, nil + } + return false, nil +} + +// possiblyInsertInitialSchema stores the latest schema when a tracker starts and the schema_version table is empty +// this enables the right schema to be available between the time the tracker starts first and the first DDL is applied +func (tr *Tracker) possiblyInsertInitialSchema(ctx context.Context) error { + var err error + needsWarming, err := tr.isSchemaVersionTableEmpty(ctx) + if err != nil { + return err + } + if !needsWarming { // _vt.schema_version is not empty, nothing to do here + return nil + } + if err = tr.engine.Reload(ctx); err != nil { + return err + } + + timestamp := time.Now().UnixNano() / 1e9 + ddl := "" + pos, err := tr.currentPosition(ctx) + if err != nil { + return err + } + gtid := mysql.EncodePosition(pos) + log.Infof("Saving initial schema for gtid %s", gtid) + + return tr.saveCurrentSchemaToDb(ctx, gtid, ddl, timestamp) +} + func (tr *Tracker) schemaUpdated(gtid string, ddl string, timestamp int64) error { log.Infof("Processing schemaUpdated event for gtid %s, ddl %s", gtid, ddl) if gtid == "" || ddl == "" { return fmt.Errorf("got invalid gtid or ddl in schemaUpdated") } ctx := context.Background() - // Engine will have reloaded the schema because vstream will reload it on a DDL + return tr.saveCurrentSchemaToDb(ctx, gtid, ddl, timestamp) +} + +func (tr *Tracker) saveCurrentSchemaToDb(ctx context.Context, gtid, ddl string, timestamp int64) error { tables := tr.engine.GetSchema() dbSchema := &binlogdatapb.MinimalSchema{ Tables: []*binlogdatapb.MinimalTable{}, diff --git a/go/vt/vttablet/tabletserver/schema/tracker_test.go b/go/vt/vttablet/tabletserver/schema/tracker_test.go index 3bbe1d63ba4..0d8b2474c63 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker_test.go +++ b/go/vt/vttablet/tabletserver/schema/tracker_test.go @@ -19,7 +19,8 @@ package schema import ( "testing" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" "vitess.io/vitess/go/sqltypes" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -27,16 +28,26 @@ import ( ) func TestTracker(t *testing.T) { + initialSchemaInserted := false se, db, cancel := getTestSchemaEngine(t) defer cancel() - gtid1 := "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10" ddl1 := "create table tracker_test (id int)" query := "CREATE TABLE IF NOT EXISTS _vt.schema_version.*" db.AddQueryPattern(query, &sqltypes.Result{}) - db.AddQueryPattern("insert into _vt.schema_version.*", &sqltypes.Result{}) - + db.AddQueryPattern("insert into _vt.schema_version.*1-10.*", &sqltypes.Result{}) + db.AddQueryPatternWithCallback("insert into _vt.schema_version.*1-3.*", &sqltypes.Result{}, func(query string) { + initialSchemaInserted = true + }) + // simulates empty schema_version table, so initial schema should be inserted + db.AddQuery("select id from _vt.schema_version limit 1", &sqltypes.Result{RowsAffected: 0}) + // called to get current position + db.AddQuery("SELECT @@GLOBAL.gtid_executed", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "", + "varchar"), + "7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-3", + )) vs := &fakeVstreamer{ done: make(chan struct{}), events: [][]*binlogdatapb.VEvent{{ @@ -74,7 +85,45 @@ func TestTracker(t *testing.T) { tracker.Close() // Two of those events should have caused an error. final := env.Stats().ErrorCounters.Counts()["INTERNAL"] - assert.Equal(t, initial+2, final) + require.Equal(t, initial+2, final) + require.True(t, initialSchemaInserted) +} + +func TestTrackerShouldNotInsertInitialSchema(t *testing.T) { + initialSchemaInserted := false + se, db, cancel := getTestSchemaEngine(t) + gtid1 := "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10" + + defer cancel() + // simulates existing rows in schema_version, so initial schema should not be inserted + db.AddQuery("select id from _vt.schema_version limit 1", &sqltypes.Result{RowsAffected: 1}) + // called to get current position + db.AddQuery("SELECT @@GLOBAL.gtid_executed", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "", + "varchar"), + "7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-3", + )) + db.AddQueryPatternWithCallback("insert into _vt.schema_version.*1-3.*", &sqltypes.Result{}, func(query string) { + initialSchemaInserted = true + }) + vs := &fakeVstreamer{ + done: make(chan struct{}), + events: [][]*binlogdatapb.VEvent{{ + { + Type: binlogdatapb.VEventType_GTID, + Gtid: gtid1, + }, + }}, + } + config := se.env.Config() + config.TrackSchemaVersions = true + env := tabletenv.NewEnv(config, "TrackerTest") + tracker := NewTracker(env, vs, se) + tracker.Open() + <-vs.done + cancel() + tracker.Close() + require.False(t, initialSchemaInserted) } var _ VStreamer = (*fakeVstreamer)(nil) From 967c9fd24a55fdddae8b363482dfe03ea507a174 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 10 Jul 2020 11:35:28 +0200 Subject: [PATCH 2/6] Initial schema: log output if vtctlclient errors out, for debugging CI errors Signed-off-by: Rohit Nayak --- go/test/endtoend/cluster/vtctlclient_process.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/cluster/vtctlclient_process.go b/go/test/endtoend/cluster/vtctlclient_process.go index edaa2fe9a52..881736bbd7c 100644 --- a/go/test/endtoend/cluster/vtctlclient_process.go +++ b/go/test/endtoend/cluster/vtctlclient_process.go @@ -73,7 +73,11 @@ func (vtctlclient *VtctlClientProcess) ExecuteCommand(args ...string) (err error pArgs..., ) log.Infof("Executing vtctlclient with command: %v", strings.Join(tmpProcess.Args, " ")) - return tmpProcess.Run() + output, err := tmpProcess.Output() + if err != nil { + log.Errorf("Error executing %s: output %s, err %v", strings.Join(tmpProcess.Args, " "), output, err) + } + return err } // ExecuteCommandWithOutput executes any vtctlclient command and returns output From cfe1867bbb41e19efc9b366b27ceeef313624af5 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 10 Jul 2020 11:54:14 +0200 Subject: [PATCH 3/6] Initial Schema: more debugging for CI errors Signed-off-by: Rohit Nayak --- go/test/endtoend/cluster/vtctlclient_process.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/cluster/vtctlclient_process.go b/go/test/endtoend/cluster/vtctlclient_process.go index 881736bbd7c..06e82af84a6 100644 --- a/go/test/endtoend/cluster/vtctlclient_process.go +++ b/go/test/endtoend/cluster/vtctlclient_process.go @@ -36,11 +36,15 @@ type VtctlClientProcess struct { // InitShardMaster executes vtctlclient command to make one of tablet as master func (vtctlclient *VtctlClientProcess) InitShardMaster(Keyspace string, Shard string, Cell string, TabletUID int) (err error) { - return vtctlclient.ExecuteCommand( + output, err := vtctlclient.ExecuteCommandWithOutput( "InitShardMaster", "-force", fmt.Sprintf("%s/%s", Keyspace, Shard), fmt.Sprintf("%s-%d", Cell, TabletUID)) + if err != nil { + log.Errorf("error in InitShardMaster output %s, err %s", output, err.Error()) + } + return err } // ApplySchema applies SQL schema to the keyspace From 9d75dca93f180234fa3c2853b0cf2519db1638e0 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sun, 12 Jul 2020 19:37:37 +0200 Subject: [PATCH 4/6] Initial Schema: move initial schema code to tr.process() Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletserver/schema/tracker.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index 604be6f1723..4aa0785fc9e 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -99,10 +99,7 @@ func (tr *Tracker) Open() { tr.cancel = cancel tr.wg.Add(1) log.Info("Schema tracker enabled.") - if err := tr.possiblyInsertInitialSchema(ctx); err != nil { - log.Errorf("possiblyInsertInitialSchema eror: %v", err) - return - } + go tr.process(ctx) } @@ -136,6 +133,10 @@ func (tr *Tracker) Enable(enabled bool) { func (tr *Tracker) process(ctx context.Context) { defer tr.env.LogError() defer tr.wg.Done() + if err := tr.possiblyInsertInitialSchema(ctx); err != nil { + log.Errorf("possiblyInsertInitialSchema eror: %v", err) + return + } filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ From 67e8f696a01b70a10554b29ac9b44cd02f45d490 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sun, 12 Jul 2020 20:18:44 +0200 Subject: [PATCH 5/6] Initial Schema: remove unnecessary test Signed-off-by: Rohit Nayak --- go/vt/vttablet/endtoend/vstreamer_test.go | 88 ----------------------- 1 file changed, 88 deletions(-) diff --git a/go/vt/vttablet/endtoend/vstreamer_test.go b/go/vt/vttablet/endtoend/vstreamer_test.go index 67e7aac05bf..b2c2d026150 100644 --- a/go/vt/vttablet/endtoend/vstreamer_test.go +++ b/go/vt/vttablet/endtoend/vstreamer_test.go @@ -320,94 +320,6 @@ func TestSchemaVersioning(t *testing.T) { log.Info("=== END OF TEST") } -func TestSchemaVersioningLongDDL(t *testing.T) { - // Let's disable the already running tracker to prevent it from - // picking events from the previous test, and then re-enable it at the end. - tsv := framework.Server - tsv.EnableHistorian(false) - tsv.SetTracking(false) - defer tsv.EnableHistorian(true) - defer tsv.SetTracking(true) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - tsv.EnableHistorian(true) - tsv.SetTracking(true) - - target := &querypb.Target{ - Keyspace: "vttest", - Shard: "0", - TabletType: tabletpb.TabletType_MASTER, - Cell: "", - } - filter := &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "/.*/", - }}, - } - longDDL := "create table vitess_version (" - for i := 0; i < 100; i++ { - col := fmt.Sprintf("id%d_%s int", i, strings.Repeat("0", 10)) - if i != 99 { - col += ", " - } - longDDL += col - } - longDDL += ")" - - var cases = []test{ - { - query: longDDL, - output: append(append([]string{ - `gtid`, //gtid+other => vstream current pos - `other`, - `gtid`, //gtid+ddl => actual query - fmt.Sprintf(`type:DDL ddl:"%s" `, longDDL)}, - getSchemaVersionTableCreationEvents()...), - `version`, - `gtid`, - ), - }, - } - eventCh := make(chan []*binlogdatapb.VEvent) - var startPos string - send := func(events []*binlogdatapb.VEvent) error { - var evs []*binlogdatapb.VEvent - for _, event := range events { - if event.Type == binlogdatapb.VEventType_GTID { - if startPos == "" { - startPos = event.Gtid - } - } - if event.Type == binlogdatapb.VEventType_HEARTBEAT { - continue - } - log.Infof("Received event %v", event) - evs = append(evs, event) - } - select { - case eventCh <- evs: - case <-ctx.Done(): - return nil - } - return nil - } - go func() { - defer close(eventCh) - if err := tsv.VStream(ctx, target, "current", nil, filter, send); err != nil { - fmt.Printf("Error in tsv.VStream: %v", err) - t.Error(err) - } - }() - runCases(ctx, t, cases, eventCh) - - cancel() - - client := framework.NewClient() - client.Execute("drop table vitess_version", nil) - client.Execute("drop table _vt.schema_version", nil) -} - func runCases(ctx context.Context, t *testing.T, tests []test, eventCh chan []*binlogdatapb.VEvent) { client := framework.NewClient() From febb16390ecbb04015c430a53360dc5e3ae01850 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 14 Jul 2020 14:03:11 +0200 Subject: [PATCH 6/6] Initial Schema: address review comments Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletserver/schema/tracker.go | 5 +---- go/vt/vttablet/tabletserver/schema/tracker_test.go | 8 ++++++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index 4aa0785fc9e..35abb422ca0 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -189,10 +189,7 @@ func (tr *Tracker) isSchemaVersionTableEmpty(ctx context.Context) (bool, error) if err != nil { return false, err } - if result == nil { - return false, fmt.Errorf("error querying _vt.schema_version") - } - if result.RowsAffected == 0 { + if len(result.Rows) == 0 { return true, nil } return false, nil diff --git a/go/vt/vttablet/tabletserver/schema/tracker_test.go b/go/vt/vttablet/tabletserver/schema/tracker_test.go index 0d8b2474c63..5435b7fa5dd 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker_test.go +++ b/go/vt/vttablet/tabletserver/schema/tracker_test.go @@ -41,7 +41,7 @@ func TestTracker(t *testing.T) { initialSchemaInserted = true }) // simulates empty schema_version table, so initial schema should be inserted - db.AddQuery("select id from _vt.schema_version limit 1", &sqltypes.Result{RowsAffected: 0}) + db.AddQuery("select id from _vt.schema_version limit 1", &sqltypes.Result{Rows: [][]sqltypes.Value{}}) // called to get current position db.AddQuery("SELECT @@GLOBAL.gtid_executed", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "", @@ -96,7 +96,11 @@ func TestTrackerShouldNotInsertInitialSchema(t *testing.T) { defer cancel() // simulates existing rows in schema_version, so initial schema should not be inserted - db.AddQuery("select id from _vt.schema_version limit 1", &sqltypes.Result{RowsAffected: 1}) + db.AddQuery("select id from _vt.schema_version limit 1", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id", + "int"), + "1", + )) // called to get current position db.AddQuery("SELECT @@GLOBAL.gtid_executed", sqltypes.MakeTestResult(sqltypes.MakeTestFields( "",