From 34fb516704b4b0c1f60eac4447ac4a0b0bcb9a41 Mon Sep 17 00:00:00 2001 From: luancheng Date: Thu, 18 Jul 2019 16:39:04 +0800 Subject: [PATCH 1/6] release mysql dependence in syncer test (1) --- pkg/binlog/event/dml.go | 2 +- pkg/binlog/event/event.go | 10 +- pkg/binlog/event/event_test.go | 20 +- syncer/syncer.go | 1 - syncer/syncer_test.go | 759 ++++++++++++++++++++++++--------- 5 files changed, 583 insertions(+), 209 deletions(-) diff --git a/pkg/binlog/event/dml.go b/pkg/binlog/event/dml.go index 11b3e0cc00..9194f5ae78 100644 --- a/pkg/binlog/event/dml.go +++ b/pkg/binlog/event/dml.go @@ -80,7 +80,7 @@ func GenDMLEvents(flavor string, serverID uint32, latestPos uint32, latestGTID g events = append(events, tableMapEv) // RowsEvent - rowsEv, err2 := GenRowsEvent(header, latestPos, eventType, data.TableID, defaultRowsFlag, data.Rows, data.ColumnType) + rowsEv, err2 := GenRowsEvent(header, latestPos, eventType, data.TableID, defaultRowsFlag, data.Rows, data.ColumnType, tableMapEv) if err2 != nil { return nil, errors.Annotatef(err2, "generate RowsEvent for `%s`.`%s`", data.Schema, data.Table) } diff --git a/pkg/binlog/event/event.go b/pkg/binlog/event/event.go index 66b00a6b72..7a9c5aa7f3 100644 --- a/pkg/binlog/event/event.go +++ b/pkg/binlog/event/event.go @@ -517,7 +517,7 @@ func GenTableMapEvent(header *replication.EventHeader, latestPos uint32, tableID // DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2 // ref: https://dev.mysql.com/doc/internals/en/rows-event.html // ref: http://blog.51cto.com/yanzongshuai/2090894 -func GenRowsEvent(header *replication.EventHeader, latestPos uint32, eventType replication.EventType, tableID uint64, rowsFlags uint16, rows [][]interface{}, columnType []byte) (*replication.BinlogEvent, error) { +func GenRowsEvent(header *replication.EventHeader, latestPos uint32, eventType replication.EventType, tableID uint64, rowsFlags uint16, rows [][]interface{}, columnType []byte, tableMapEv *replication.BinlogEvent) (*replication.BinlogEvent, error) { switch eventType { case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2, replication.UPDATE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2, @@ -675,9 +675,11 @@ func GenRowsEvent(header *replication.EventHeader, latestPos uint32, eventType r } // parse TableMapEvent - tableMapEv, err := GenTableMapEvent(header, latestPos, tableID, []byte("schema-placeholder"), []byte("table-placeholder"), columnType) - if err != nil { - return nil, errors.Annotate(err, "generate TableMapEvent") + if tableMapEv == nil { + tableMapEv, err = GenTableMapEvent(header, latestPos, tableID, []byte("schema-placeholder"), []byte("table-placeholder"), columnType) + if err != nil { + return nil, errors.Annotate(err, "generate TableMapEvent") + } } _, err = parse2.ParseSingleEvent(bytes.NewReader(tableMapEv.RawData), onEventFunc) if err != nil { diff --git a/pkg/binlog/event/event_test.go b/pkg/binlog/event/event_test.go index 9307e19340..ad2bde4fc7 100644 --- a/pkg/binlog/event/event_test.go +++ b/pkg/binlog/event/event_test.go @@ -490,26 +490,26 @@ func (t *testEventSuite) TestGenRowsEvent(c *C) { ) // invalid eventType, rows and columnType - rowsEv, err := GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType) + rowsEv, err := GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil) c.Assert(err, NotNil) c.Assert(rowsEv, IsNil) // valid eventType, invalid rows and columnType eventType = replication.WRITE_ROWS_EVENTv0 - rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType) + rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil) c.Assert(err, NotNil) c.Assert(rowsEv, IsNil) // valid eventType and rows, invalid columnType row := []interface{}{int32(1)} rows = append(rows, row) - rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType) + rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil) c.Assert(err, NotNil) c.Assert(rowsEv, IsNil) // all valid columnType = []byte{gmysql.MYSQL_TYPE_LONG} - rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType) + rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil) c.Assert(err, IsNil) c.Assert(rowsEv, NotNil) @@ -530,7 +530,7 @@ func (t *testEventSuite) TestGenRowsEvent(c *C) { // multi rows, with different length, invalid rows = append(rows, []interface{}{int32(1), int32(2)}) - rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType) + rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil) c.Assert(err, NotNil) c.Assert(rowsEv, IsNil) @@ -539,7 +539,7 @@ func (t *testEventSuite) TestGenRowsEvent(c *C) { rows = append(rows, []interface{}{int32(1), int32(2)}) rows = append(rows, []interface{}{int32(3), int32(4)}) columnType = []byte{gmysql.MYSQL_TYPE_LONG, gmysql.MYSQL_TYPE_LONG} - rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType) + rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil) c.Assert(err, IsNil) c.Assert(rowsEv, NotNil) // verify the body @@ -556,7 +556,7 @@ func (t *testEventSuite) TestGenRowsEvent(c *C) { replication.DELETE_ROWS_EVENTv0, replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2, } for _, eventType = range evTypes { - rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType) + rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil) c.Assert(err, IsNil) c.Assert(rowsEv, NotNil) c.Assert(rowsEv.Header.EventType, Equals, eventType) @@ -568,7 +568,7 @@ func (t *testEventSuite) TestGenRowsEvent(c *C) { float32(1.23), float64(4.56), "string with type STRING"}) columnType = []byte{gmysql.MYSQL_TYPE_LONG, gmysql.MYSQL_TYPE_TINY, gmysql.MYSQL_TYPE_SHORT, gmysql.MYSQL_TYPE_INT24, gmysql.MYSQL_TYPE_LONGLONG, gmysql.MYSQL_TYPE_FLOAT, gmysql.MYSQL_TYPE_DOUBLE, gmysql.MYSQL_TYPE_STRING} - rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType) + rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil) c.Assert(err, IsNil) c.Assert(rowsEv, NotNil) // verify the body @@ -580,7 +580,7 @@ func (t *testEventSuite) TestGenRowsEvent(c *C) { // column type mismatch rows[0][0] = int8(1) - rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType) + rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil) c.Assert(err, NotNil) c.Assert(rowsEv, IsNil) @@ -596,7 +596,7 @@ func (t *testEventSuite) TestGenRowsEvent(c *C) { gmysql.MYSQL_TYPE_BLOB, gmysql.MYSQL_TYPE_JSON, gmysql.MYSQL_TYPE_GEOMETRY} for i := range unsupportedTypes { columnType = unsupportedTypes[i : i+1] - rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType) + rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil) c.Assert(err, NotNil) c.Assert(strings.Contains(err.Error(), "not supported"), IsTrue) c.Assert(rowsEv, IsNil) diff --git a/syncer/syncer.go b/syncer/syncer.go index f908ad8f75..a755b89010 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -997,7 +997,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { s.tctx.L().Info("replicate binlog from checkpoint", zap.Stringer("checkpoint", lastPos)) s.streamer, err = s.streamerProducer.generateStreamer(lastPos) - if err != nil { return errors.Trace(err) } diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 995c5f551b..b125a5fbcf 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -26,11 +26,13 @@ import ( "github.com/DATA-DOG/go-sqlmock" _ "github.com/go-sql-driver/mysql" . "github.com/pingcap/check" + "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/parser/ast" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" cm "github.com/pingcap/tidb-tools/pkg/column-mapping" "github.com/pingcap/tidb-tools/pkg/filter" router "github.com/pingcap/tidb-tools/pkg/table-router" + "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" "go.uber.org/zap" @@ -50,10 +52,11 @@ func TestSuite(t *testing.T) { } type testSyncerSuite struct { - db *sql.DB - syncer *replication.BinlogSyncer - streamer *replication.BinlogStreamer - cfg *config.SubTaskConfig + db *sql.DB + syncer *replication.BinlogSyncer + streamer *replication.BinlogStreamer + cfg *config.SubTaskConfig + eventsGenerator *event.Generator } func (s *testSyncerSuite) SetUpSuite(c *C) { @@ -64,6 +67,7 @@ func (s *testSyncerSuite) SetUpSuite(c *C) { MetaSchema: "test", Name: "syncer_ut", Mode: config.ModeIncrement, + Flavor: "mysql", } s.cfg.From.Adjust() s.cfg.To.Adjust() @@ -80,11 +84,26 @@ func (s *testSyncerSuite) SetUpSuite(c *C) { s.resetMaster() s.resetBinlogSyncer() + s.resetEventsGenerator() _, err = s.db.Exec("SET GLOBAL binlog_format = 'ROW';") c.Assert(err, IsNil) } +func (s *testSyncerSuite) resetEventsGenerator() { + previousGTIDSetStr := "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14,406a3f61-690d-11e7-87c5-6c92bf46f384:1-94321383" + previousGTIDSet, err := gtid.ParserGTID(s.cfg.Flavor, previousGTIDSetStr) + if err != nil { + log.L().Fatal("", zap.Error(err)) + } + latestGTIDStr := "3ccc475b-2343-11e7-be21-6c0b84d59f30:14" + latestGTID, err := gtid.ParserGTID(s.cfg.Flavor, latestGTIDStr) + s.eventsGenerator, err = event.NewGenerator(s.cfg.Flavor, uint32(s.cfg.ServerID), 0, latestGTID, previousGTIDSet, 0) + if err != nil { + log.L().Fatal("", zap.Error(err)) + } +} + func (s *testSyncerSuite) resetBinlogSyncer() { var err error cfg := replication.BinlogSyncerConfig{ @@ -178,7 +197,11 @@ func (s *testSyncerSuite) TestSelectDB(c *C) { skip: skips[i]}) } - p, err := utils.GetParser(s.db, false) + db, mock, err := sqlmock.New() + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). + WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) + p, err := utils.GetParser(db, false) c.Assert(err, IsNil) syncer := NewSyncer(s.cfg) @@ -212,8 +235,6 @@ func (s *testSyncerSuite) TestSelectDB(c *C) { } func (s *testSyncerSuite) TestSelectTable(c *C) { - s.resetBinlogSyncer() - s.cfg.BWList = &filter.Rules{ DoDBs: []string{"t2", "stest", "~^ptest*"}, DoTables: []*filter.Table{ @@ -222,33 +243,117 @@ func (s *testSyncerSuite) TestSelectTable(c *C) { {Schema: "~^ptest*", Name: "~^t.*"}, }, } + s.resetEventsGenerator() + allEvents := make([]*replication.BinlogEvent, 0, 24) + + evs, _, err := s.eventsGenerator.GenCreateDatabaseEvents("s1") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("s1", "create table s1.log(id int)") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("s1") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + + evs, _, err = s.eventsGenerator.GenCreateTableEvents("mysql", "create table mysql.test(id int)") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropTableEvents("mysql", "test") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("stest") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest", "create table stest.log(id int)") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest", "create table stest.t(id int)") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest", "create table stest.log2(id int)") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + + columnType := []byte{mysql.MYSQL_TYPE_LONG} + insertRows := [][]interface{}{{int32(10)}} + dmlData := []*event.DMLData{ + { + TableID: 8, + Schema: "stest", + Table: "t", + ColumnType: columnType, + Rows: insertRows, + }, + } + eventType := replication.WRITE_ROWS_EVENTv2 + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + + dmlData = []*event.DMLData{ + { + TableID: 9, + Schema: "stest", + Table: "log", + ColumnType: columnType, + Rows: insertRows, + }, + } + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) - sqls := []string{ - "create database s1", - "create table s1.log(id int)", - "drop database s1", - - "create table mysql.test(id int)", - "drop table mysql.test", - "create database stest", - "create table stest.log(id int)", - "create table stest.t(id int)", - "create table stest.log2(id int)", - "insert into stest.t(id) values (10)", - "insert into stest.log(id) values (10)", - "insert into stest.log2(id) values (10)", - "drop table stest.log,stest.t,stest.log2", - "drop database stest", - - "create database t2", - "create table t2.log(id int)", - "create table t2.log1(id int)", - "drop table t2.log", - "drop database t2", - "create database ptest1", - "create table ptest1.t1(id int)", - "drop database ptest1", + dmlData = []*event.DMLData{ + { + TableID: 10, + Schema: "stest", + Table: "log2", + ColumnType: columnType, + Rows: insertRows, + }, } + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropTableEvents("stest", "log") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropTableEvents("stest", "t") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropTableEvents("stest", "log2") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("stest") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + + evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("t2") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("t2", "create table t2.log(id int)") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("t2", "create table t2.log1(id int)") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropTableEvents("t2", "log") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("t2") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("ptest1") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("ptest1", "create table ptest1.t1(id int)") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("ptest1") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + res := [][]bool{ {true}, {true}, @@ -263,7 +368,9 @@ func (s *testSyncerSuite) TestSelectTable(c *C) { {false}, {false}, {true}, - {false, false, true}, + {false}, + {false}, + {true}, {false}, {false}, @@ -276,22 +383,17 @@ func (s *testSyncerSuite) TestSelectTable(c *C) { {false}, } - for _, sql := range sqls { - s.db.Exec(sql) - } - - p, err := utils.GetParser(s.db, false) + db, mock, err := sqlmock.New() + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). + WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) + p, err := utils.GetParser(db, false) c.Assert(err, IsNil) syncer := NewSyncer(s.cfg) syncer.genRouter() - var i int - for { - if i == len(sqls) { - break - } - e, err := s.streamer.GetEvent(context.Background()) - c.Assert(err, IsNil) + i := 0 + for _, e := range allEvents { switch ev := e.Event.(type) { case *replication.QueryEvent: query := string(ev.Query) @@ -300,7 +402,6 @@ func (s *testSyncerSuite) TestSelectTable(c *C) { if !result.isDDL { continue // BEGIN event } - querys, _, err := syncer.resolveDDLSQL(p, result.stmt, string(ev.Schema)) c.Assert(err, IsNil) if len(querys) == 0 { @@ -337,44 +438,63 @@ func (s *testSyncerSuite) TestIgnoreDB(c *C) { IgnoreDBs: []string{"~^b.*", "s1", "stest"}, } - sqls := []string{ - "create database s1", - "drop database s1", - "create database s2", - "drop database s2", - "create database btest", - "drop database btest", - "create database b1", - "drop database b1", - "create database stest", - "drop database stest", - "create database st", - "drop database st", - } - res := []bool{true, true, false, false, true, true, true, true, true, true, false, false} + s.resetEventsGenerator() + allEvents := make([]*replication.BinlogEvent, 0, 12) - for _, sql := range sqls { - s.db.Exec(sql) - } + evs, _, err := s.eventsGenerator.GenCreateDatabaseEvents("s1") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("s1") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("s2") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("s2") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("btest") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("btest") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("b1") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("b1") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("stest") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("stest") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("st") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("st") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) - p, err := utils.GetParser(s.db, false) + res := []bool{true, true, false, false, true, true, true, true, true, true, false, false} + + db, mock, err := sqlmock.New() + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). + WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) + p, err := utils.GetParser(db, false) c.Assert(err, IsNil) syncer := NewSyncer(s.cfg) syncer.genRouter() i := 0 - for { - if i == len(sqls) { - break - } - - e, err := s.streamer.GetEvent(context.Background()) - c.Assert(err, IsNil) + for _, e := range allEvents { ev, ok := e.Event.(*replication.QueryEvent) if !ok { continue } - sql := string(ev.Query) stmt, err := p.ParseOneStmt(sql, "", "") c.Assert(err, IsNil) @@ -386,12 +506,9 @@ func (s *testSyncerSuite) TestIgnoreDB(c *C) { c.Assert(r, Equals, res[i]) i++ } - s.catchUpBinlog() } func (s *testSyncerSuite) TestIgnoreTable(c *C) { - s.resetBinlogSyncer() - s.cfg.BWList = &filter.Rules{ IgnoreDBs: []string{"t2"}, IgnoreTables: []*filter.Table{ @@ -400,29 +517,105 @@ func (s *testSyncerSuite) TestIgnoreTable(c *C) { }, } - sqls := []string{ - "create database s1", - "create table s1.log(id int)", - "drop database s1", - - "create table mysql.test(id int)", - "drop table mysql.test", - "create database stest", - "create table stest.log(id int)", - "create table stest.t(id int)", - "create table stest.log2(id int)", - "insert into stest.t(id) values (10)", - "insert into stest.log(id) values (10)", - "insert into stest.log2(id) values (10)", - "drop table stest.log,stest.t,stest.log2", - "drop database stest", - - "create database t2", - "create table t2.log(id int)", - "create table t2.log1(id int)", - "drop table t2.log", - "drop database t2", + s.resetEventsGenerator() + allEvents := make([]*replication.BinlogEvent, 0, 20) + + evs, _, err := s.eventsGenerator.GenCreateDatabaseEvents("s1") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("s1", "create table s1.log(id int)") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("s1") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + + evs, _, err = s.eventsGenerator.GenCreateTableEvents("mysql", "create table mysql.test(id int)") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropTableEvents("mysql", "test") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("stest") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest", "create table stest.log(id int)") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest", "create table stest.t(id int)") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest", "create table stest.log2(id int)") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + columnType := []byte{mysql.MYSQL_TYPE_LONG} + insertRows := [][]interface{}{{int32(10)}} + dmlData := []*event.DMLData{ + { + TableID: 8, + Schema: "stest", + Table: "t", + ColumnType: columnType, + Rows: insertRows, + }, + } + eventType := replication.WRITE_ROWS_EVENTv2 + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + dmlData = []*event.DMLData{ + { + TableID: 9, + Schema: "stest", + Table: "log", + ColumnType: columnType, + Rows: insertRows, + }, } + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + dmlData = []*event.DMLData{ + { + TableID: 10, + Schema: "stest", + Table: "log2", + ColumnType: columnType, + Rows: insertRows, + }, + } + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropTableEvents("stest", "log") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropTableEvents("stest", "t") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropTableEvents("stest", "log2") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("stest") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + + evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("t2") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("t2", "create table t2.log(id int)") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("t2", "create table t2.log1(id int)") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropTableEvents("t2", "log") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("t2") + c.Assert(err, IsNil) + allEvents = append(allEvents, evs...) + res := [][]bool{ {false}, {false}, @@ -437,7 +630,9 @@ func (s *testSyncerSuite) TestIgnoreTable(c *C) { {true}, {true}, {false}, - {true, true, false}, + {true}, + {true}, + {false}, {false}, {true}, @@ -447,23 +642,18 @@ func (s *testSyncerSuite) TestIgnoreTable(c *C) { {true}, } - for _, sql := range sqls { - s.db.Exec(sql) - } - - p, err := utils.GetParser(s.db, false) + db, mock, err := sqlmock.New() + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). + WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) + p, err := utils.GetParser(db, false) c.Assert(err, IsNil) syncer := NewSyncer(s.cfg) - syncer.genRouter() + i := 0 - for { - if i == len(sqls) { - break - } - e, err := s.streamer.GetEvent(context.Background()) - c.Assert(err, IsNil) + for _, e := range allEvents { switch ev := e.Event.(type) { case *replication.QueryEvent: query := string(ev.Query) @@ -497,15 +687,11 @@ func (s *testSyncerSuite) TestIgnoreTable(c *C) { default: continue } - i++ } - s.catchUpBinlog() } func (s *testSyncerSuite) TestSkipDML(c *C) { - s.resetBinlogSyncer() - s.cfg.FilterRules = []*bf.BinlogEventRule{ { SchemaPattern: "*", @@ -531,34 +717,173 @@ func (s *testSyncerSuite) TestSkipDML(c *C) { } s.cfg.BWList = nil - sqls := []struct { - sql string + s.resetEventsGenerator() + + type SQLChecker struct { + events []*replication.BinlogEvent isDML bool skipped bool - }{ - {"drop database if exists foo", false, false}, - {"create database foo", false, false}, - {"create table foo.bar(id int)", false, false}, - {"insert into foo.bar values(1)", true, false}, - {"update foo.bar set id=2", true, true}, - {"delete from foo.bar where id=2", true, true}, - {"drop database if exists foo1", false, false}, - {"create database foo1", false, false}, - {"create table foo1.bar1(id int)", false, false}, - {"insert into foo1.bar1 values(1)", true, false}, - {"update foo1.bar1 set id=2", true, true}, - {"delete from foo1.bar1 where id=2", true, true}, - {"create table foo1.bar2(id int)", false, false}, - {"insert into foo1.bar2 values(1)", true, false}, - {"update foo1.bar2 set id=2", true, true}, - {"delete from foo1.bar2 where id=2", true, true}, } - for i := range sqls { - s.db.Exec(sqls[i].sql) + sqls := make([]SQLChecker, 0, 16) + + evs, _, err := s.eventsGenerator.GenDropDatabaseEvents("foo") + c.Assert(err, IsNil) + sqls = append(sqls, SQLChecker{events: evs, isDML: false, skipped: false}) + + evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("foo") + c.Assert(err, IsNil) + sqls = append(sqls, SQLChecker{events: evs, isDML: false, skipped: false}) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("foo", "create table foo.bar(id int)") + c.Assert(err, IsNil) + sqls = append(sqls, SQLChecker{events: evs, isDML: false, skipped: false}) + columnType := []byte{mysql.MYSQL_TYPE_LONG} + insertRows := [][]interface{}{{int32(1)}} + dmlData := []*event.DMLData{ + { + TableID: 8, + Schema: "foo", + Table: "bar", + ColumnType: columnType, + Rows: insertRows, + }, + } + eventType := replication.WRITE_ROWS_EVENTv2 + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: false}) + updateRows := [][]interface{}{{int32(2)}, {int32(1)}} + dmlData = []*event.DMLData{ + { + TableID: 8, + Schema: "foo", + Table: "bar", + ColumnType: columnType, + Rows: updateRows, + }, + } + eventType = replication.UPDATE_ROWS_EVENTv2 + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: true}) + deleteRows := [][]interface{}{{int32(2)}} + dmlData = []*event.DMLData{ + { + TableID: 8, + Schema: "foo", + Table: "bar", + ColumnType: columnType, + Rows: deleteRows, + }, + } + eventType = replication.DELETE_ROWS_EVENTv2 + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: true}) + + evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("foo1") + c.Assert(err, IsNil) + sqls = append(sqls, SQLChecker{events: evs, isDML: false, skipped: false}) + evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("foo1") + c.Assert(err, IsNil) + sqls = append(sqls, SQLChecker{events: evs, isDML: false, skipped: false}) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("foo1", "create table foo1.bar1(id int)") + c.Assert(err, IsNil) + sqls = append(sqls, SQLChecker{events: evs, isDML: false, skipped: false}) + insertRows = [][]interface{}{{int32(1)}} + dmlData = []*event.DMLData{ + { + TableID: 8, + Schema: "foo1", + Table: "bar1", + ColumnType: columnType, + Rows: insertRows, + }, + } + eventType = replication.WRITE_ROWS_EVENTv2 + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: false}) + + updateRows = [][]interface{}{{int32(2)}, {int32(1)}} + dmlData = []*event.DMLData{ + { + TableID: 8, + Schema: "foo1", + Table: "bar1", + ColumnType: columnType, + Rows: updateRows, + }, + } + eventType = replication.UPDATE_ROWS_EVENTv2 + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: true}) + deleteRows = [][]interface{}{{int32(2)}} + dmlData = []*event.DMLData{ + { + TableID: 8, + Schema: "foo1", + Table: "bar1", + ColumnType: columnType, + Rows: deleteRows, + }, + } + eventType = replication.DELETE_ROWS_EVENTv2 + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: true}) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("foo1", "create table foo1.bar2(id int)") + c.Assert(err, IsNil) + sqls = append(sqls, SQLChecker{events: evs, isDML: false, skipped: false}) + insertRows = [][]interface{}{{int32(1)}} + dmlData = []*event.DMLData{ + { + TableID: 8, + Schema: "foo1", + Table: "bar2", + ColumnType: columnType, + Rows: insertRows, + }, + } + eventType = replication.WRITE_ROWS_EVENTv2 + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: false}) + updateRows = [][]interface{}{{int32(2)}, {int32(1)}} + dmlData = []*event.DMLData{ + { + TableID: 8, + Schema: "foo1", + Table: "bar2", + ColumnType: columnType, + Rows: updateRows, + }, + } + eventType = replication.UPDATE_ROWS_EVENTv2 + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: true}) + deleteRows = [][]interface{}{{int32(2)}} + dmlData = []*event.DMLData{ + { + TableID: 8, + Schema: "foo1", + Table: "bar2", + ColumnType: columnType, + Rows: deleteRows, + }, } + eventType = replication.DELETE_ROWS_EVENTv2 + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: true}) - p, err := utils.GetParser(s.db, false) + db, mock, err := sqlmock.New() + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). + WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) + p, err := utils.GetParser(db, false) c.Assert(err, IsNil) syncer := NewSyncer(s.cfg) @@ -567,32 +892,27 @@ func (s *testSyncerSuite) TestSkipDML(c *C) { syncer.binlogFilter, err = bf.NewBinlogEvent(false, s.cfg.FilterRules) c.Assert(err, IsNil) - i := 0 - for { - if i >= len(sqls) { - break - } - e, err := s.streamer.GetEvent(context.Background()) - c.Assert(err, IsNil) - switch ev := e.Event.(type) { - case *replication.QueryEvent: - stmt, err := p.ParseOneStmt(string(ev.Query), "", "") - c.Assert(err, IsNil) - _, isDDL := stmt.(ast.DDLNode) - if !isDDL { + for _, sql := range sqls { + events := sql.events + for _, e := range events { + switch ev := e.Event.(type) { + case *replication.QueryEvent: + stmt, err := p.ParseOneStmt(string(ev.Query), "", "") + c.Assert(err, IsNil) + _, isDDL := stmt.(ast.DDLNode) + if !isDDL { + continue + } + + case *replication.RowsEvent: + r, err := syncer.skipDMLEvent(string(ev.Table.Schema), string(ev.Table.Table), e.Header.EventType) + c.Assert(err, IsNil) + c.Assert(r, Equals, sql.skipped) + default: continue } - - case *replication.RowsEvent: - r, err := syncer.skipDMLEvent(string(ev.Table.Schema), string(ev.Table.Table), e.Header.EventType) - c.Assert(err, IsNil) - c.Assert(r, Equals, sqls[i].skipped) - default: - continue } - i++ } - s.catchUpBinlog() } func (s *testSyncerSuite) TestColumnMapping(c *C) { @@ -613,55 +933,110 @@ func (s *testSyncerSuite) TestColumnMapping(c *C) { }, } - createTableSQLs := []string{ - "create database if not exists stest_3", - "create table if not exists stest_3.log(id varchar(45))", - "create table if not exists stest_3.t_2(name varchar(45), id bigint)", - "create table if not exists stest_3.a(id int)", - } + s.resetEventsGenerator() + + createEvents := make([]*replication.BinlogEvent, 0, 4) + dmlEvents := make([]*replication.BinlogEvent, 0, 15) + dropEvents := make([]*replication.BinlogEvent, 0, 4) - dmls := []struct { - sql string + //create db and tables + evs, _, err := s.eventsGenerator.GenCreateDatabaseEvents("stest_3") + c.Assert(err, IsNil) + createEvents = append(createEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest_3", "create table stest_3.log(id varchar(45))") + c.Assert(err, IsNil) + createEvents = append(createEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest_3", "create table stest_3.t_2(name varchar(45), id bigint)") + c.Assert(err, IsNil) + createEvents = append(createEvents, evs...) + evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest_3", "create table stest_3.a(id int)") + c.Assert(err, IsNil) + createEvents = append(createEvents, evs...) + + // dmls + type dml struct { + events []*replication.BinlogEvent column []string data []interface{} - }{ - {"insert into stest_3.t_2(name, id) values (\"ian\", 10)", []string{"name", "id"}, []interface{}{"ian", int64(1<<59 | 3<<52 | 2<<44 | 10)}}, - {"insert into stest_3.log(id) values (\"10\")", []string{"id"}, []interface{}{"test:10"}}, - {"insert into stest_3.a(id) values (10)", []string{"id"}, []interface{}{int32(10)}}, } - - dropTableSQLs := []string{ - "drop table stest_3.log,stest_3.t_2,stest_3.a", - "drop database stest_3", + dmls := make([]dml, 0, 3) + columnType := []byte{mysql.MYSQL_TYPE_STRING, mysql.MYSQL_TYPE_LONGLONG} + insertRows := [][]interface{}{{"ian", int64(10)}} + dmlData := []*event.DMLData{ + { + TableID: 8, + Schema: "stest_3", + Table: "t_2", + ColumnType: columnType, + Rows: insertRows, + }, } + eventType := replication.WRITE_ROWS_EVENTv2 + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + dmls = append(dmls, dml{events: evs, column: []string{"name", "id"}, data: []interface{}{"ian", int64(1<<59 | 3<<52 | 2<<44 | 10)}}) - for _, sql := range createTableSQLs { - s.db.Exec(sql) + columnType = []byte{mysql.MYSQL_TYPE_STRING} + insertRows = [][]interface{}{{"10"}} + dmlData = []*event.DMLData{ + { + TableID: 9, + Schema: "stest_3", + Table: "log", + ColumnType: columnType, + Rows: insertRows, + }, } + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + dmls = append(dmls, dml{events: evs, column: []string{"id"}, data: []interface{}{"test:10"}}) - for i := range dmls { - s.db.Exec(dmls[i].sql) + columnType = []byte{mysql.MYSQL_TYPE_LONG} + insertRows = [][]interface{}{{int32(10)}} + dmlData = []*event.DMLData{ + { + TableID: 10, + Schema: "stest_3", + Table: "a", + ColumnType: columnType, + Rows: insertRows, + }, } - - for _, sql := range dropTableSQLs { - s.db.Exec(sql) + evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + dmls = append(dmls, dml{events: evs, column: []string{"id"}, data: []interface{}{int32(10)}}) + for _, dml := range dmls { + dmlEvents = append(dmlEvents, dml.events...) } - p, err := utils.GetParser(s.db, false) + // drop tables and db + evs, _, err = s.eventsGenerator.GenDropTableEvents("stest_3", "log") + c.Assert(err, IsNil) + dropEvents = append(dropEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropTableEvents("stest_3", "t_2") + c.Assert(err, IsNil) + dropEvents = append(dropEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropTableEvents("stest_3", "a") + c.Assert(err, IsNil) + dropEvents = append(dropEvents, evs...) + evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("stest_3") + c.Assert(err, IsNil) + dropEvents = append(dropEvents, evs...) + + db, mock, err := sqlmock.New() + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). + WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) + p, err := utils.GetParser(db, false) c.Assert(err, IsNil) mapping, err := cm.NewMapping(false, rules) c.Assert(err, IsNil) - totalEvent := len(dmls) + len(createTableSQLs) + len(dropTableSQLs) - i := 0 + allEvents := append(createEvents, dmlEvents...) + allEvents = append(allEvents, dropEvents...) dmlIndex := 0 - for { - if i == totalEvent { - break - } - e, err := s.streamer.GetEvent(context.Background()) - c.Assert(err, IsNil) + for _, e := range allEvents { switch ev := e.Event.(type) { case *replication.QueryEvent: stmt, err := p.ParseOneStmt(string(ev.Query), "", "") @@ -678,9 +1053,7 @@ func (s *testSyncerSuite) TestColumnMapping(c *C) { default: continue } - i++ } - s.catchUpBinlog() } func (s *testSyncerSuite) TestTimezone(c *C) { From 0662f6c5e130b622635a2f37a10eaf18ef9bec00 Mon Sep 17 00:00:00 2001 From: luancheng Date: Mon, 29 Jul 2019 17:11:57 +0800 Subject: [PATCH 2/6] abstract generate mock events --- syncer/syncer_test.go | 631 ++++++++++++------------------------------ 1 file changed, 182 insertions(+), 449 deletions(-) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index b125a5fbcf..cdd0edd7b2 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -51,6 +51,23 @@ func TestSuite(t *testing.T) { TestingT(t) } +type mockBinlogEvents []mockBinlogEvent +type mockBinlogEvent struct { + typ int + args []interface{} +} + +const ( + DBCreate = iota + DBDrop + TableCreate + TableDrop + + Write + Update + Delete +) + type testSyncerSuite struct { db *sql.DB syncer *replication.BinlogSyncer @@ -90,6 +107,52 @@ func (s *testSyncerSuite) SetUpSuite(c *C) { c.Assert(err, IsNil) } +func (s *testSyncerSuite) generateEvents(binlogEvents mockBinlogEvents, c *C) []*replication.BinlogEvent { + events := make([]*replication.BinlogEvent, 0, 1024) + for _, e := range binlogEvents { + switch e.typ { + case DBCreate: + evs, _, err := s.eventsGenerator.GenCreateDatabaseEvents(e.args[0].(string)) + c.Assert(err, IsNil) + events = append(events, evs...) + case DBDrop: + evs, _, err := s.eventsGenerator.GenDropDatabaseEvents(e.args[0].(string)) + c.Assert(err, IsNil) + events = append(events, evs...) + case TableCreate: + evs, _, err := s.eventsGenerator.GenCreateTableEvents(e.args[0].(string), e.args[1].(string)) + c.Assert(err, IsNil) + events = append(events, evs...) + case TableDrop: + evs, _, err := s.eventsGenerator.GenDropTableEvents(e.args[0].(string), e.args[1].(string)) + c.Assert(err, IsNil) + events = append(events, evs...) + + case Write, Update, Delete: + dmlData := []*event.DMLData{ + { + TableID: e.args[0].(uint64), + Schema: e.args[1].(string), + Table: e.args[2].(string), + ColumnType: e.args[3].([]byte), + Rows: e.args[4].([][]interface{}), + }, + } + eventType := replication.WRITE_ROWS_EVENTv2 + if e.typ == Update { + eventType = replication.UPDATE_ROWS_EVENTv2 + } else if e.typ == Delete { + eventType = replication.DELETE_ROWS_EVENTv2 + } + evs, _, err := s.eventsGenerator.GenDMLEvents(eventType, dmlData) + c.Assert(err, IsNil) + events = append(events, evs...) + } + + } + return events +} + func (s *testSyncerSuite) resetEventsGenerator() { previousGTIDSetStr := "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14,406a3f61-690d-11e7-87c5-6c92bf46f384:1-94321383" previousGTIDSet, err := gtid.ParserGTID(s.cfg.Flavor, previousGTIDSetStr) @@ -244,115 +307,36 @@ func (s *testSyncerSuite) TestSelectTable(c *C) { }, } s.resetEventsGenerator() - allEvents := make([]*replication.BinlogEvent, 0, 24) - - evs, _, err := s.eventsGenerator.GenCreateDatabaseEvents("s1") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("s1", "create table s1.log(id int)") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("s1") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - - evs, _, err = s.eventsGenerator.GenCreateTableEvents("mysql", "create table mysql.test(id int)") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropTableEvents("mysql", "test") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("stest") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest", "create table stest.log(id int)") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest", "create table stest.t(id int)") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest", "create table stest.log2(id int)") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - - columnType := []byte{mysql.MYSQL_TYPE_LONG} - insertRows := [][]interface{}{{int32(10)}} - dmlData := []*event.DMLData{ - { - TableID: 8, - Schema: "stest", - Table: "t", - ColumnType: columnType, - Rows: insertRows, - }, - } - eventType := replication.WRITE_ROWS_EVENTv2 - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - - dmlData = []*event.DMLData{ - { - TableID: 9, - Schema: "stest", - Table: "log", - ColumnType: columnType, - Rows: insertRows, - }, - } - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - - dmlData = []*event.DMLData{ - { - TableID: 10, - Schema: "stest", - Table: "log2", - ColumnType: columnType, - Rows: insertRows, - }, - } - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropTableEvents("stest", "log") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropTableEvents("stest", "t") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropTableEvents("stest", "log2") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("stest") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - - evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("t2") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("t2", "create table t2.log(id int)") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("t2", "create table t2.log1(id int)") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropTableEvents("t2", "log") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("t2") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("ptest1") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("ptest1", "create table ptest1.t1(id int)") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("ptest1") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) + events := make([]mockBinlogEvent, 0, 24) + events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"s1"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"s1", "create table s1.log(id int)"}}) + events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"s1"}}) + + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"mysql", "create table mysql.test(id int)"}}) + events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"mysql", "test"}}) + events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"stest"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.log(id int)"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.t(id int)"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.log2(id int)"}}) + events = append(events, mockBinlogEvent{typ: Write, args: []interface{}{uint64(8), "stest", "t", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}) + events = append(events, mockBinlogEvent{typ: Write, args: []interface{}{uint64(9), "stest", "log", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}) + events = append(events, mockBinlogEvent{typ: Write, args: []interface{}{uint64(10), "stest", "log2", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}) + events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "log"}}) + events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "t"}}) + events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "log2"}}) + events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"stest"}}) + + events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"t2"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"t2", "create table t2.log(id int)"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"t2", "create table t2.log1(id int)"}}) + events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"t2", "log"}}) + events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"t2"}}) + + events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"ptest1"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"ptest1", "create table ptest1.t1(id int)"}}) + events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"ptest1"}}) + + allEvents := s.generateEvents(events, c) res := [][]bool{ {true}, @@ -378,6 +362,7 @@ func (s *testSyncerSuite) TestSelectTable(c *C) { {true}, {true}, {false}, + {false}, {false}, {false}, @@ -397,6 +382,7 @@ func (s *testSyncerSuite) TestSelectTable(c *C) { switch ev := e.Event.(type) { case *replication.QueryEvent: query := string(ev.Query) + result, err := syncer.parseDDLSQL(query, p, string(ev.Schema)) c.Assert(err, IsNil) if !result.isDDL { @@ -427,56 +413,29 @@ func (s *testSyncerSuite) TestSelectTable(c *C) { } i++ } - s.catchUpBinlog() } func (s *testSyncerSuite) TestIgnoreDB(c *C) { - s.resetMaster() - s.resetBinlogSyncer() - s.cfg.BWList = &filter.Rules{ IgnoreDBs: []string{"~^b.*", "s1", "stest"}, } s.resetEventsGenerator() - allEvents := make([]*replication.BinlogEvent, 0, 12) - - evs, _, err := s.eventsGenerator.GenCreateDatabaseEvents("s1") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("s1") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("s2") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("s2") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("btest") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("btest") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("b1") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("b1") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("stest") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("stest") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("st") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("st") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) + events := make([]mockBinlogEvent, 0, 12) + events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"s1"}}) + events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"s1"}}) + events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"s2"}}) + events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"s2"}}) + events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"btest"}}) + events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"btest"}}) + events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"b1"}}) + events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"b1"}}) + events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"stest"}}) + events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"stest"}}) + events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"st"}}) + events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"st"}}) + + allEvents := s.generateEvents(events, c) res := []bool{true, true, false, false, true, true, true, true, true, true, false, false} @@ -518,103 +477,32 @@ func (s *testSyncerSuite) TestIgnoreTable(c *C) { } s.resetEventsGenerator() - allEvents := make([]*replication.BinlogEvent, 0, 20) - - evs, _, err := s.eventsGenerator.GenCreateDatabaseEvents("s1") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("s1", "create table s1.log(id int)") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("s1") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - - evs, _, err = s.eventsGenerator.GenCreateTableEvents("mysql", "create table mysql.test(id int)") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropTableEvents("mysql", "test") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("stest") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest", "create table stest.log(id int)") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest", "create table stest.t(id int)") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest", "create table stest.log2(id int)") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - columnType := []byte{mysql.MYSQL_TYPE_LONG} - insertRows := [][]interface{}{{int32(10)}} - dmlData := []*event.DMLData{ - { - TableID: 8, - Schema: "stest", - Table: "t", - ColumnType: columnType, - Rows: insertRows, - }, - } - eventType := replication.WRITE_ROWS_EVENTv2 - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - dmlData = []*event.DMLData{ - { - TableID: 9, - Schema: "stest", - Table: "log", - ColumnType: columnType, - Rows: insertRows, - }, - } - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - dmlData = []*event.DMLData{ - { - TableID: 10, - Schema: "stest", - Table: "log2", - ColumnType: columnType, - Rows: insertRows, - }, - } - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropTableEvents("stest", "log") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropTableEvents("stest", "t") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropTableEvents("stest", "log2") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("stest") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - - evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("t2") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("t2", "create table t2.log(id int)") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("t2", "create table t2.log1(id int)") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropTableEvents("t2", "log") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("t2") - c.Assert(err, IsNil) - allEvents = append(allEvents, evs...) + events := make([]mockBinlogEvent, 0, 12) + events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"s1"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"s1", "create table s1.log(id int)"}}) + events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"s1"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"mysql", "create table mysql.test(id int)"}}) + events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"mysql", "test"}}) + + events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"stest"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.log(id int)"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.t(id int)"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.log2(id int)"}}) + + events = append(events, mockBinlogEvent{typ: Write, args: []interface{}{uint64(8), "stest", "t", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}) + events = append(events, mockBinlogEvent{typ: Write, args: []interface{}{uint64(9), "stest", "log", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}) + events = append(events, mockBinlogEvent{typ: Write, args: []interface{}{uint64(10), "stest", "log2", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}) + events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "log"}}) + events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "t"}}) + events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "log2"}}) + events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"stest"}}) + + events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"t2"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"t2", "create table t2.log(id int)"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"t2", "create table t2.log1(id int)"}}) + events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"t2", "log"}}) + events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"t2"}}) + allEvents := s.generateEvents(events, c) res := [][]bool{ {false}, @@ -727,156 +615,49 @@ func (s *testSyncerSuite) TestSkipDML(c *C) { sqls := make([]SQLChecker, 0, 16) - evs, _, err := s.eventsGenerator.GenDropDatabaseEvents("foo") - c.Assert(err, IsNil) + evs := s.generateEvents([]mockBinlogEvent{{DBCreate, []interface{}{"foo"}}}, c) sqls = append(sqls, SQLChecker{events: evs, isDML: false, skipped: false}) - evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("foo") - c.Assert(err, IsNil) - sqls = append(sqls, SQLChecker{events: evs, isDML: false, skipped: false}) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("foo", "create table foo.bar(id int)") - c.Assert(err, IsNil) + evs = s.generateEvents([]mockBinlogEvent{{TableCreate, []interface{}{"foo", "create table foo.bar(id int)"}}}, c) sqls = append(sqls, SQLChecker{events: evs, isDML: false, skipped: false}) - columnType := []byte{mysql.MYSQL_TYPE_LONG} - insertRows := [][]interface{}{{int32(1)}} - dmlData := []*event.DMLData{ - { - TableID: 8, - Schema: "foo", - Table: "bar", - ColumnType: columnType, - Rows: insertRows, - }, - } - eventType := replication.WRITE_ROWS_EVENTv2 - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) + + evs = s.generateEvents([]mockBinlogEvent{{Write, []interface{}{uint64(8), "foo", "bar", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(1)}}}}}, c) sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: false}) - updateRows := [][]interface{}{{int32(2)}, {int32(1)}} - dmlData = []*event.DMLData{ - { - TableID: 8, - Schema: "foo", - Table: "bar", - ColumnType: columnType, - Rows: updateRows, - }, - } - eventType = replication.UPDATE_ROWS_EVENTv2 - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) + + evs = s.generateEvents([]mockBinlogEvent{{Update, []interface{}{uint64(8), "foo", "bar", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(2)}, {int32(1)}}}}}, c) sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: true}) - deleteRows := [][]interface{}{{int32(2)}} - dmlData = []*event.DMLData{ - { - TableID: 8, - Schema: "foo", - Table: "bar", - ColumnType: columnType, - Rows: deleteRows, - }, - } - eventType = replication.DELETE_ROWS_EVENTv2 - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) + + evs = s.generateEvents([]mockBinlogEvent{{Delete, []interface{}{uint64(8), "foo", "bar", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(2)}}}}}, c) sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: true}) - evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("foo1") - c.Assert(err, IsNil) + evs = s.generateEvents([]mockBinlogEvent{{DBDrop, []interface{}{"foo1"}}}, c) sqls = append(sqls, SQLChecker{events: evs, isDML: false, skipped: false}) - evs, _, err = s.eventsGenerator.GenCreateDatabaseEvents("foo1") - c.Assert(err, IsNil) + + evs = s.generateEvents([]mockBinlogEvent{{DBCreate, []interface{}{"foo1"}}}, c) sqls = append(sqls, SQLChecker{events: evs, isDML: false, skipped: false}) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("foo1", "create table foo1.bar1(id int)") - c.Assert(err, IsNil) + + evs = s.generateEvents([]mockBinlogEvent{{TableCreate, []interface{}{"foo1", "create table foo1.bar1(id int)"}}}, c) sqls = append(sqls, SQLChecker{events: evs, isDML: false, skipped: false}) - insertRows = [][]interface{}{{int32(1)}} - dmlData = []*event.DMLData{ - { - TableID: 8, - Schema: "foo1", - Table: "bar1", - ColumnType: columnType, - Rows: insertRows, - }, - } - eventType = replication.WRITE_ROWS_EVENTv2 - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) + + evs = s.generateEvents([]mockBinlogEvent{{Write, []interface{}{uint64(9), "foo1", "bar1", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(1)}}}}}, c) sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: false}) - updateRows = [][]interface{}{{int32(2)}, {int32(1)}} - dmlData = []*event.DMLData{ - { - TableID: 8, - Schema: "foo1", - Table: "bar1", - ColumnType: columnType, - Rows: updateRows, - }, - } - eventType = replication.UPDATE_ROWS_EVENTv2 - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) + evs = s.generateEvents([]mockBinlogEvent{{Update, []interface{}{uint64(9), "foo1", "bar1", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(2)}, {int32(1)}}}}}, c) sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: true}) - deleteRows = [][]interface{}{{int32(2)}} - dmlData = []*event.DMLData{ - { - TableID: 8, - Schema: "foo1", - Table: "bar1", - ColumnType: columnType, - Rows: deleteRows, - }, - } - eventType = replication.DELETE_ROWS_EVENTv2 - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) + + evs = s.generateEvents([]mockBinlogEvent{{Delete, []interface{}{uint64(9), "foo1", "bar1", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(2)}}}}}, c) sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: true}) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("foo1", "create table foo1.bar2(id int)") - c.Assert(err, IsNil) + + evs = s.generateEvents([]mockBinlogEvent{{TableCreate, []interface{}{"foo1", "create table foo1.bar2(id int)"}}}, c) sqls = append(sqls, SQLChecker{events: evs, isDML: false, skipped: false}) - insertRows = [][]interface{}{{int32(1)}} - dmlData = []*event.DMLData{ - { - TableID: 8, - Schema: "foo1", - Table: "bar2", - ColumnType: columnType, - Rows: insertRows, - }, - } - eventType = replication.WRITE_ROWS_EVENTv2 - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) + + evs = s.generateEvents([]mockBinlogEvent{{Write, []interface{}{uint64(10), "foo1", "bar2", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(1)}}}}}, c) sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: false}) - updateRows = [][]interface{}{{int32(2)}, {int32(1)}} - dmlData = []*event.DMLData{ - { - TableID: 8, - Schema: "foo1", - Table: "bar2", - ColumnType: columnType, - Rows: updateRows, - }, - } - eventType = replication.UPDATE_ROWS_EVENTv2 - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) + + evs = s.generateEvents([]mockBinlogEvent{{Update, []interface{}{uint64(10), "foo1", "bar2", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(2)}, {int32(1)}}}}}, c) sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: true}) - deleteRows = [][]interface{}{{int32(2)}} - dmlData = []*event.DMLData{ - { - TableID: 8, - Schema: "foo1", - Table: "bar2", - ColumnType: columnType, - Rows: deleteRows, - }, - } - eventType = replication.DELETE_ROWS_EVENTv2 - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) + + evs = s.generateEvents([]mockBinlogEvent{{Delete, []interface{}{uint64(10), "foo1", "bar2", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(2)}}}}}, c) sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: true}) db, mock, err := sqlmock.New() @@ -935,23 +716,14 @@ func (s *testSyncerSuite) TestColumnMapping(c *C) { s.resetEventsGenerator() - createEvents := make([]*replication.BinlogEvent, 0, 4) - dmlEvents := make([]*replication.BinlogEvent, 0, 15) - dropEvents := make([]*replication.BinlogEvent, 0, 4) - //create db and tables - evs, _, err := s.eventsGenerator.GenCreateDatabaseEvents("stest_3") - c.Assert(err, IsNil) - createEvents = append(createEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest_3", "create table stest_3.log(id varchar(45))") - c.Assert(err, IsNil) - createEvents = append(createEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest_3", "create table stest_3.t_2(name varchar(45), id bigint)") - c.Assert(err, IsNil) - createEvents = append(createEvents, evs...) - evs, _, err = s.eventsGenerator.GenCreateTableEvents("stest_3", "create table stest_3.a(id int)") - c.Assert(err, IsNil) - createEvents = append(createEvents, evs...) + events := make([]mockBinlogEvent, 0, 24) + events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"stest_3"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest_3", "create table stest_3.log(id varchar(45))"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest_3", "create table stest_3.t_2(name varchar(45), id bigint)"}}) + events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest_3", "create table stest_3.a(id int)"}}) + + createEvents := s.generateEvents(events, c) // dmls type dml struct { @@ -959,69 +731,30 @@ func (s *testSyncerSuite) TestColumnMapping(c *C) { column []string data []interface{} } + dmls := make([]dml, 0, 3) - columnType := []byte{mysql.MYSQL_TYPE_STRING, mysql.MYSQL_TYPE_LONGLONG} - insertRows := [][]interface{}{{"ian", int64(10)}} - dmlData := []*event.DMLData{ - { - TableID: 8, - Schema: "stest_3", - Table: "t_2", - ColumnType: columnType, - Rows: insertRows, - }, - } - eventType := replication.WRITE_ROWS_EVENTv2 - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) + + evs := s.generateEvents([]mockBinlogEvent{{typ: Write, args: []interface{}{uint64(8), "stest_3", "t_2", []byte{mysql.MYSQL_TYPE_STRING, mysql.MYSQL_TYPE_LONG}, [][]interface{}{{"ian", int32(10)}}}}}, c) dmls = append(dmls, dml{events: evs, column: []string{"name", "id"}, data: []interface{}{"ian", int64(1<<59 | 3<<52 | 2<<44 | 10)}}) - columnType = []byte{mysql.MYSQL_TYPE_STRING} - insertRows = [][]interface{}{{"10"}} - dmlData = []*event.DMLData{ - { - TableID: 9, - Schema: "stest_3", - Table: "log", - ColumnType: columnType, - Rows: insertRows, - }, - } - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) + evs = s.generateEvents([]mockBinlogEvent{{typ: Write, args: []interface{}{uint64(9), "stest_3", "log", []byte{mysql.MYSQL_TYPE_STRING}, [][]interface{}{{"10"}}}}}, c) dmls = append(dmls, dml{events: evs, column: []string{"id"}, data: []interface{}{"test:10"}}) - columnType = []byte{mysql.MYSQL_TYPE_LONG} - insertRows = [][]interface{}{{int32(10)}} - dmlData = []*event.DMLData{ - { - TableID: 10, - Schema: "stest_3", - Table: "a", - ColumnType: columnType, - Rows: insertRows, - }, - } - evs, _, err = s.eventsGenerator.GenDMLEvents(eventType, dmlData) - c.Assert(err, IsNil) + evs = s.generateEvents([]mockBinlogEvent{{typ: Write, args: []interface{}{uint64(10), "stest_3", "a", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}}, c) dmls = append(dmls, dml{events: evs, column: []string{"id"}, data: []interface{}{int32(10)}}) + + dmlEvents := make([]*replication.BinlogEvent, 0, 15) for _, dml := range dmls { dmlEvents = append(dmlEvents, dml.events...) } // drop tables and db - evs, _, err = s.eventsGenerator.GenDropTableEvents("stest_3", "log") - c.Assert(err, IsNil) - dropEvents = append(dropEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropTableEvents("stest_3", "t_2") - c.Assert(err, IsNil) - dropEvents = append(dropEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropTableEvents("stest_3", "a") - c.Assert(err, IsNil) - dropEvents = append(dropEvents, evs...) - evs, _, err = s.eventsGenerator.GenDropDatabaseEvents("stest_3") - c.Assert(err, IsNil) - dropEvents = append(dropEvents, evs...) + events = events[:0] + events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest_3", "log"}}) + events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest_3", "t_2"}}) + events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest_3", "a"}}) + events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"stest_3"}}) + dropEvents := s.generateEvents(events, c) db, mock, err := sqlmock.New() mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). From 01e049217e5bc89f35e9ae1e266e4050b1648851 Mon Sep 17 00:00:00 2001 From: luancheng Date: Mon, 29 Jul 2019 17:20:34 +0800 Subject: [PATCH 3/6] failNow when generate mock events error --- syncer/syncer_test.go | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index cdd0edd7b2..256e7fb78c 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -101,7 +101,7 @@ func (s *testSyncerSuite) SetUpSuite(c *C) { s.resetMaster() s.resetBinlogSyncer() - s.resetEventsGenerator() + s.resetEventsGenerator(c) _, err = s.db.Exec("SET GLOBAL binlog_format = 'ROW';") c.Assert(err, IsNil) @@ -153,17 +153,19 @@ func (s *testSyncerSuite) generateEvents(binlogEvents mockBinlogEvents, c *C) [] return events } -func (s *testSyncerSuite) resetEventsGenerator() { +func (s *testSyncerSuite) resetEventsGenerator(c *C) { previousGTIDSetStr := "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14,406a3f61-690d-11e7-87c5-6c92bf46f384:1-94321383" previousGTIDSet, err := gtid.ParserGTID(s.cfg.Flavor, previousGTIDSetStr) if err != nil { - log.L().Fatal("", zap.Error(err)) + c.Fatal("", zap.Error(err)) + c.FailNow() } latestGTIDStr := "3ccc475b-2343-11e7-be21-6c0b84d59f30:14" latestGTID, err := gtid.ParserGTID(s.cfg.Flavor, latestGTIDStr) s.eventsGenerator, err = event.NewGenerator(s.cfg.Flavor, uint32(s.cfg.ServerID), 0, latestGTID, previousGTIDSet, 0) if err != nil { - log.L().Fatal("", zap.Error(err)) + c.Fatal("", zap.Error(err)) + c.FailNow() } } @@ -306,7 +308,7 @@ func (s *testSyncerSuite) TestSelectTable(c *C) { {Schema: "~^ptest*", Name: "~^t.*"}, }, } - s.resetEventsGenerator() + s.resetEventsGenerator(c) events := make([]mockBinlogEvent, 0, 24) events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"s1"}}) events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"s1", "create table s1.log(id int)"}}) @@ -420,7 +422,7 @@ func (s *testSyncerSuite) TestIgnoreDB(c *C) { IgnoreDBs: []string{"~^b.*", "s1", "stest"}, } - s.resetEventsGenerator() + s.resetEventsGenerator(c) events := make([]mockBinlogEvent, 0, 12) events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"s1"}}) events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"s1"}}) @@ -476,7 +478,7 @@ func (s *testSyncerSuite) TestIgnoreTable(c *C) { }, } - s.resetEventsGenerator() + s.resetEventsGenerator(c) events := make([]mockBinlogEvent, 0, 12) events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"s1"}}) events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"s1", "create table s1.log(id int)"}}) @@ -605,7 +607,7 @@ func (s *testSyncerSuite) TestSkipDML(c *C) { } s.cfg.BWList = nil - s.resetEventsGenerator() + s.resetEventsGenerator(c) type SQLChecker struct { events []*replication.BinlogEvent @@ -714,7 +716,7 @@ func (s *testSyncerSuite) TestColumnMapping(c *C) { }, } - s.resetEventsGenerator() + s.resetEventsGenerator(c) //create db and tables events := make([]mockBinlogEvent, 0, 24) From 400997c3593940f1796bbe43b9020a859091b863 Mon Sep 17 00:00:00 2001 From: luancheng Date: Tue, 30 Jul 2019 20:54:58 +0800 Subject: [PATCH 4/6] address comments --- syncer/syncer_test.go | 40 ++++++++++++++-------------------------- 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 256e7fb78c..1069c8431f 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -18,6 +18,7 @@ import ( "database/sql" "database/sql/driver" "fmt" + "github.com/pingcap/parser" "strings" "sync" "testing" @@ -158,14 +159,12 @@ func (s *testSyncerSuite) resetEventsGenerator(c *C) { previousGTIDSet, err := gtid.ParserGTID(s.cfg.Flavor, previousGTIDSetStr) if err != nil { c.Fatal("", zap.Error(err)) - c.FailNow() } latestGTIDStr := "3ccc475b-2343-11e7-be21-6c0b84d59f30:14" latestGTID, err := gtid.ParserGTID(s.cfg.Flavor, latestGTIDStr) s.eventsGenerator, err = event.NewGenerator(s.cfg.Flavor, uint32(s.cfg.ServerID), 0, latestGTID, previousGTIDSet, 0) if err != nil { c.Fatal("", zap.Error(err)) - c.FailNow() } } @@ -209,6 +208,13 @@ func (s *testSyncerSuite) TearDownSuite(c *C) { s.db.Close() } +func (s *testSyncerSuite) mockParser(db *sql.DB, mock sqlmock.Sqlmock) (*parser.Parser, error) { + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). + WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) + return utils.GetParser(db, false) +} + func (s *testSyncerSuite) resetMaster() { s.db.Exec("reset master") } @@ -263,10 +269,7 @@ func (s *testSyncerSuite) TestSelectDB(c *C) { } db, mock, err := sqlmock.New() - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). - WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) - p, err := utils.GetParser(db, false) + p, err := s.mockParser(db, mock) c.Assert(err, IsNil) syncer := NewSyncer(s.cfg) @@ -371,10 +374,7 @@ func (s *testSyncerSuite) TestSelectTable(c *C) { } db, mock, err := sqlmock.New() - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). - WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) - p, err := utils.GetParser(db, false) + p, err := s.mockParser(db, mock) c.Assert(err, IsNil) syncer := NewSyncer(s.cfg) @@ -442,10 +442,7 @@ func (s *testSyncerSuite) TestIgnoreDB(c *C) { res := []bool{true, true, false, false, true, true, true, true, true, true, false, false} db, mock, err := sqlmock.New() - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). - WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) - p, err := utils.GetParser(db, false) + p, err := s.mockParser(db, mock) c.Assert(err, IsNil) syncer := NewSyncer(s.cfg) @@ -533,10 +530,7 @@ func (s *testSyncerSuite) TestIgnoreTable(c *C) { } db, mock, err := sqlmock.New() - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). - WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) - p, err := utils.GetParser(db, false) + p, err := s.mockParser(db, mock) c.Assert(err, IsNil) syncer := NewSyncer(s.cfg) @@ -663,10 +657,7 @@ func (s *testSyncerSuite) TestSkipDML(c *C) { sqls = append(sqls, SQLChecker{events: evs, isDML: true, skipped: true}) db, mock, err := sqlmock.New() - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). - WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) - p, err := utils.GetParser(db, false) + p, err := s.mockParser(db, mock) c.Assert(err, IsNil) syncer := NewSyncer(s.cfg) @@ -759,10 +750,7 @@ func (s *testSyncerSuite) TestColumnMapping(c *C) { dropEvents := s.generateEvents(events, c) db, mock, err := sqlmock.New() - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE"). - WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) - p, err := utils.GetParser(db, false) + p, err := s.mockParser(db, mock) c.Assert(err, IsNil) mapping, err := cm.NewMapping(false, rules) From ef1437d75fad91109ace19960363bfd47412f17f Mon Sep 17 00:00:00 2001 From: luancheng Date: Wed, 31 Jul 2019 16:29:42 +0800 Subject: [PATCH 5/6] address comment --- syncer/syncer_test.go | 174 ++++++++++++++++++++++-------------------- 1 file changed, 93 insertions(+), 81 deletions(-) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 1069c8431f..5d7dd4368c 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -139,11 +139,16 @@ func (s *testSyncerSuite) generateEvents(binlogEvents mockBinlogEvents, c *C) [] Rows: e.args[4].([][]interface{}), }, } - eventType := replication.WRITE_ROWS_EVENTv2 - if e.typ == Update { + var eventType replication.EventType + switch e.typ { + case Write: + eventType = replication.WRITE_ROWS_EVENTv2 + case Update: eventType = replication.UPDATE_ROWS_EVENTv2 - } else if e.typ == Delete { + case Delete: eventType = replication.DELETE_ROWS_EVENTv2 + default: + c.Fatal(fmt.Sprintf("mock event generator don't support event type: %d", e.typ)) } evs, _, err := s.eventsGenerator.GenDMLEvents(eventType, dmlData) c.Assert(err, IsNil) @@ -158,13 +163,13 @@ func (s *testSyncerSuite) resetEventsGenerator(c *C) { previousGTIDSetStr := "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14,406a3f61-690d-11e7-87c5-6c92bf46f384:1-94321383" previousGTIDSet, err := gtid.ParserGTID(s.cfg.Flavor, previousGTIDSetStr) if err != nil { - c.Fatal("", zap.Error(err)) + c.Fatal(err) } latestGTIDStr := "3ccc475b-2343-11e7-be21-6c0b84d59f30:14" latestGTID, err := gtid.ParserGTID(s.cfg.Flavor, latestGTIDStr) s.eventsGenerator, err = event.NewGenerator(s.cfg.Flavor, uint32(s.cfg.ServerID), 0, latestGTID, previousGTIDSet, 0) if err != nil { - c.Fatal("", zap.Error(err)) + c.Fatal(err) } } @@ -312,34 +317,35 @@ func (s *testSyncerSuite) TestSelectTable(c *C) { }, } s.resetEventsGenerator(c) - events := make([]mockBinlogEvent, 0, 24) - events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"s1"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"s1", "create table s1.log(id int)"}}) - events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"s1"}}) - - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"mysql", "create table mysql.test(id int)"}}) - events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"mysql", "test"}}) - events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"stest"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.log(id int)"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.t(id int)"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.log2(id int)"}}) - events = append(events, mockBinlogEvent{typ: Write, args: []interface{}{uint64(8), "stest", "t", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}) - events = append(events, mockBinlogEvent{typ: Write, args: []interface{}{uint64(9), "stest", "log", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}) - events = append(events, mockBinlogEvent{typ: Write, args: []interface{}{uint64(10), "stest", "log2", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}) - events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "log"}}) - events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "t"}}) - events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "log2"}}) - events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"stest"}}) - - events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"t2"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"t2", "create table t2.log(id int)"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"t2", "create table t2.log1(id int)"}}) - events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"t2", "log"}}) - events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"t2"}}) - - events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"ptest1"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"ptest1", "create table ptest1.t1(id int)"}}) - events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"ptest1"}}) + events := mockBinlogEvents{ + mockBinlogEvent{typ: DBCreate, args: []interface{}{"s1"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"s1", "create table s1.log(id int)"}}, + mockBinlogEvent{typ: DBDrop, args: []interface{}{"s1"}}, + + mockBinlogEvent{typ: TableCreate, args: []interface{}{"mysql", "create table mysql.test(id int)"}}, + mockBinlogEvent{typ: TableDrop, args: []interface{}{"mysql", "test"}}, + mockBinlogEvent{typ: DBCreate, args: []interface{}{"stest"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.log(id int)"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.t(id int)"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.log2(id int)"}}, + mockBinlogEvent{typ: Write, args: []interface{}{uint64(8), "stest", "t", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}, + mockBinlogEvent{typ: Write, args: []interface{}{uint64(9), "stest", "log", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}, + mockBinlogEvent{typ: Write, args: []interface{}{uint64(10), "stest", "log2", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}, + mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "log"}}, + mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "t"}}, + mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "log2"}}, + mockBinlogEvent{typ: DBDrop, args: []interface{}{"stest"}}, + + mockBinlogEvent{typ: DBCreate, args: []interface{}{"t2"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"t2", "create table t2.log(id int)"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"t2", "create table t2.log1(id int)"}}, + mockBinlogEvent{typ: TableDrop, args: []interface{}{"t2", "log"}}, + mockBinlogEvent{typ: DBDrop, args: []interface{}{"t2"}}, + + mockBinlogEvent{typ: DBCreate, args: []interface{}{"ptest1"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"ptest1", "create table ptest1.t1(id int)"}}, + mockBinlogEvent{typ: DBDrop, args: []interface{}{"ptest1"}}, + } allEvents := s.generateEvents(events, c) @@ -423,19 +429,20 @@ func (s *testSyncerSuite) TestIgnoreDB(c *C) { } s.resetEventsGenerator(c) - events := make([]mockBinlogEvent, 0, 12) - events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"s1"}}) - events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"s1"}}) - events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"s2"}}) - events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"s2"}}) - events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"btest"}}) - events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"btest"}}) - events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"b1"}}) - events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"b1"}}) - events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"stest"}}) - events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"stest"}}) - events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"st"}}) - events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"st"}}) + events := mockBinlogEvents{ + mockBinlogEvent{typ: DBCreate, args: []interface{}{"s1"}}, + mockBinlogEvent{typ: DBDrop, args: []interface{}{"s1"}}, + mockBinlogEvent{typ: DBCreate, args: []interface{}{"s2"}}, + mockBinlogEvent{typ: DBDrop, args: []interface{}{"s2"}}, + mockBinlogEvent{typ: DBCreate, args: []interface{}{"btest"}}, + mockBinlogEvent{typ: DBDrop, args: []interface{}{"btest"}}, + mockBinlogEvent{typ: DBCreate, args: []interface{}{"b1"}}, + mockBinlogEvent{typ: DBDrop, args: []interface{}{"b1"}}, + mockBinlogEvent{typ: DBCreate, args: []interface{}{"stest"}}, + mockBinlogEvent{typ: DBDrop, args: []interface{}{"stest"}}, + mockBinlogEvent{typ: DBCreate, args: []interface{}{"st"}}, + mockBinlogEvent{typ: DBDrop, args: []interface{}{"st"}}, + } allEvents := s.generateEvents(events, c) @@ -476,31 +483,33 @@ func (s *testSyncerSuite) TestIgnoreTable(c *C) { } s.resetEventsGenerator(c) - events := make([]mockBinlogEvent, 0, 12) - events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"s1"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"s1", "create table s1.log(id int)"}}) - events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"s1"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"mysql", "create table mysql.test(id int)"}}) - events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"mysql", "test"}}) - - events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"stest"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.log(id int)"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.t(id int)"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.log2(id int)"}}) - - events = append(events, mockBinlogEvent{typ: Write, args: []interface{}{uint64(8), "stest", "t", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}) - events = append(events, mockBinlogEvent{typ: Write, args: []interface{}{uint64(9), "stest", "log", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}) - events = append(events, mockBinlogEvent{typ: Write, args: []interface{}{uint64(10), "stest", "log2", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}) - events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "log"}}) - events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "t"}}) - events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "log2"}}) - events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"stest"}}) - - events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"t2"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"t2", "create table t2.log(id int)"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"t2", "create table t2.log1(id int)"}}) - events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"t2", "log"}}) - events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"t2"}}) + events := mockBinlogEvents{ + mockBinlogEvent{typ: DBCreate, args: []interface{}{"s1"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"s1", "create table s1.log(id int)"}}, + mockBinlogEvent{typ: DBDrop, args: []interface{}{"s1"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"mysql", "create table mysql.test(id int)"}}, + mockBinlogEvent{typ: TableDrop, args: []interface{}{"mysql", "test"}}, + + mockBinlogEvent{typ: DBCreate, args: []interface{}{"stest"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.log(id int)"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.t(id int)"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest", "create table stest.log2(id int)"}}, + + mockBinlogEvent{typ: Write, args: []interface{}{uint64(8), "stest", "t", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}, + mockBinlogEvent{typ: Write, args: []interface{}{uint64(9), "stest", "log", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}, + mockBinlogEvent{typ: Write, args: []interface{}{uint64(10), "stest", "log2", []byte{mysql.MYSQL_TYPE_LONG}, [][]interface{}{{int32(10)}}}}, + // TODO event generator support generate an event with multiple tables DDL + mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "log"}}, + mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "t"}}, + mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest", "log2"}}, + mockBinlogEvent{typ: DBDrop, args: []interface{}{"stest"}}, + + mockBinlogEvent{typ: DBCreate, args: []interface{}{"t2"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"t2", "create table t2.log(id int)"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"t2", "create table t2.log1(id int)"}}, + mockBinlogEvent{typ: TableDrop, args: []interface{}{"t2", "log"}}, + mockBinlogEvent{typ: DBDrop, args: []interface{}{"t2"}}, + } allEvents := s.generateEvents(events, c) res := [][]bool{ @@ -710,11 +719,12 @@ func (s *testSyncerSuite) TestColumnMapping(c *C) { s.resetEventsGenerator(c) //create db and tables - events := make([]mockBinlogEvent, 0, 24) - events = append(events, mockBinlogEvent{typ: DBCreate, args: []interface{}{"stest_3"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest_3", "create table stest_3.log(id varchar(45))"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest_3", "create table stest_3.t_2(name varchar(45), id bigint)"}}) - events = append(events, mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest_3", "create table stest_3.a(id int)"}}) + events := mockBinlogEvents{ + mockBinlogEvent{typ: DBCreate, args: []interface{}{"stest_3"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest_3", "create table stest_3.log(id varchar(45))"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest_3", "create table stest_3.t_2(name varchar(45), id bigint)"}}, + mockBinlogEvent{typ: TableCreate, args: []interface{}{"stest_3", "create table stest_3.a(id int)"}}, + } createEvents := s.generateEvents(events, c) @@ -742,11 +752,13 @@ func (s *testSyncerSuite) TestColumnMapping(c *C) { } // drop tables and db - events = events[:0] - events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest_3", "log"}}) - events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest_3", "t_2"}}) - events = append(events, mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest_3", "a"}}) - events = append(events, mockBinlogEvent{typ: DBDrop, args: []interface{}{"stest_3"}}) + events = mockBinlogEvents{ + // TODO event generator support generate an event with multiple tables DDL + mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest_3", "log"}}, + mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest_3", "t_2"}}, + mockBinlogEvent{typ: TableDrop, args: []interface{}{"stest_3", "a"}}, + mockBinlogEvent{typ: DBDrop, args: []interface{}{"stest_3"}}, + } dropEvents := s.generateEvents(events, c) db, mock, err := sqlmock.New() From 3adaf4933d152b6529c9d0bfae85ad2c84a01e99 Mon Sep 17 00:00:00 2001 From: luancheng Date: Wed, 31 Jul 2019 17:33:18 +0800 Subject: [PATCH 6/6] fix ci --- syncer/syncer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 5d7dd4368c..fd301d11c1 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1295,7 +1295,7 @@ func (s *testSyncerSuite) TestSharding(c *C) { runSQL(dropSQLs) s.resetMaster() // must wait for reset Master finish - time.Sleep(time.Second) + time.Sleep(2 * time.Second) db, mock, err := sqlmock.New() if err != nil {