diff --git a/dm/pkg/schema/tracker.go b/dm/pkg/schema/tracker.go index 29d5619200b..f22e4a9ed83 100644 --- a/dm/pkg/schema/tracker.go +++ b/dm/pkg/schema/tracker.go @@ -72,14 +72,14 @@ type Tracker struct { type downstreamTracker struct { downstreamConn *dbconn.DBConn // downstream connection stmtParser *parser.Parser // statement parser - tableInfos map[string]*downstreamTableInfo // downstream table infos + tableInfos map[string]*DownstreamTableInfo // downstream table infos } -// downstreamTableInfo contains tableinfo and index cache. -type downstreamTableInfo struct { - tableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree - indexCache *model.IndexInfo // index cache include pk/uk(not null) - availableUKCache []*model.IndexInfo // index cache include uks(data not null) +// DownstreamTableInfo contains tableinfo and index cache. +type DownstreamTableInfo struct { + TableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree + AbsoluteUKIndexInfo *model.IndexInfo // absolute uk index is a pk/uk(not null) + AvailableUKIndexList []*model.IndexInfo // index list which is all uks } // NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve @@ -181,7 +181,7 @@ func NewTracker(ctx context.Context, task string, sessionCfg map[string]string, // init downstreamTracker dsTracker := &downstreamTracker{ downstreamConn: downstreamConn, - tableInfos: make(map[string]*downstreamTableInfo), + tableInfos: make(map[string]*DownstreamTableInfo), } return &Tracker{ @@ -375,9 +375,9 @@ func (tr *Tracker) GetSystemVar(name string) (string, bool) { return tr.se.GetSessionVars().GetSystemVar(name) } -// GetDownStreamIndexInfo gets downstream PK/UK(not null) Index. +// GetDownStreamTableInfo gets downstream table info. // note. this function will init downstreamTrack's table info. -func (tr *Tracker) GetDownStreamIndexInfo(tctx *tcontext.Context, tableID string, originTi *model.TableInfo) (*model.IndexInfo, error) { +func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string, originTi *model.TableInfo) (*DownstreamTableInfo, error) { dti, ok := tr.dsTracker.tableInfos[tableID] if !ok { tctx.Logger.Info("Downstream schema tracker init. ", zap.String("tableID", tableID)) @@ -387,10 +387,10 @@ func (tr *Tracker) GetDownStreamIndexInfo(tctx *tcontext.Context, tableID string return nil, err } - dti = getDownStreamTi(ti, originTi) + dti = GetDownStreamTi(ti, originTi) tr.dsTracker.tableInfos[tableID] = dti } - return dti.indexCache, nil + return dti, nil } // GetAvailableDownStreamUKIndexInfo gets available downstream UK whose data is not null. @@ -398,7 +398,7 @@ func (tr *Tracker) GetDownStreamIndexInfo(tctx *tcontext.Context, tableID string func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []interface{}) *model.IndexInfo { dti, ok := tr.dsTracker.tableInfos[tableID] - if !ok || len(dti.availableUKCache) == 0 { + if !ok || len(dti.AvailableUKIndexList) == 0 { return nil } // func for check data is not null @@ -406,13 +406,9 @@ func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []inte return data[i] != nil } - for i, uk := range dti.availableUKCache { + for _, uk := range dti.AvailableUKIndexList { // check uk's column data is not null if isSpecifiedIndexColumn(uk, fn) { - if i != 0 { - // exchange available uk to the first of the array to reduce judgements for next row - dti.availableUKCache[0], dti.availableUKCache[i] = dti.availableUKCache[i], dti.availableUKCache[0] - } return uk } } @@ -487,12 +483,13 @@ func (tr *Tracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error return nil } -// getDownStreamTi constructs downstreamTable index cache by tableinfo. -func getDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *downstreamTableInfo { +// GetDownStreamTi constructs downstreamTable index cache by tableinfo. +func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo { var ( - indexCache *model.IndexInfo - availableUKCache = make([]*model.IndexInfo, 0, len(ti.Indices)) - hasPk = false + absoluteUKIndexInfo *model.IndexInfo + availableUKIndexList = []*model.IndexInfo{} + hasPk = false + absoluteUKPosition = -1 ) // func for check not null constraint @@ -500,7 +497,7 @@ func getDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *downstream return mysql.HasNotNullFlag(ti.Columns[i].Flag) } - for _, idx := range ti.Indices { + for i, idx := range ti.Indices { if !idx.Primary && !idx.Unique { continue } @@ -508,15 +505,16 @@ func getDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *downstream if indexRedirect == nil { continue } + availableUKIndexList = append(availableUKIndexList, indexRedirect) if idx.Primary { - indexCache = indexRedirect + absoluteUKIndexInfo = indexRedirect + absoluteUKPosition = i hasPk = true - } else if idx.Unique { + } else { // second check not null unique key - if !hasPk && isSpecifiedIndexColumn(idx, fn) { - indexCache = indexRedirect - } else { - availableUKCache = append(availableUKCache, indexRedirect) + if absoluteUKIndexInfo == nil && isSpecifiedIndexColumn(idx, fn) { + absoluteUKIndexInfo = indexRedirect + absoluteUKPosition = i } } } @@ -526,14 +524,21 @@ func getDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *downstream if !hasPk { exPk := redirectIndexKeys(handlePkExCase(ti), originTi) if exPk != nil { - indexCache = exPk + absoluteUKIndexInfo = exPk + absoluteUKPosition = len(availableUKIndexList) + availableUKIndexList = append(availableUKIndexList, absoluteUKIndexInfo) } } - return &downstreamTableInfo{ - tableInfo: ti, - indexCache: indexCache, - availableUKCache: availableUKCache, + // move absoluteUKIndexInfo to the first in availableUKIndexList + if absoluteUKPosition != -1 && len(availableUKIndexList) > 1 { + availableUKIndexList[0], availableUKIndexList[absoluteUKPosition] = availableUKIndexList[absoluteUKPosition], availableUKIndexList[0] + } + + return &DownstreamTableInfo{ + TableInfo: ti, + AbsoluteUKIndexInfo: absoluteUKIndexInfo, + AvailableUKIndexList: availableUKIndexList, } } diff --git a/dm/pkg/schema/tracker_test.go b/dm/pkg/schema/tracker_test.go index df0922182b5..3180c7af325 100644 --- a/dm/pkg/schema/tracker_test.go +++ b/dm/pkg/schema/tracker_test.go @@ -603,145 +603,140 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int, b int, c varchar(10))")) - indexinfo, err := tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err := tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - _, ok := tracker.dsTracker.tableInfos[tableID] - c.Assert(ok, IsTrue) - c.Assert(indexinfo, IsNil) + c.Assert(dti, NotNil) + c.Assert(dti.AbsoluteUKIndexInfo, IsNil) delete(tracker.dsTracker.tableInfos, tableID) // downstream has pk(not constraints like "create table t(a int primary key,b int not null)" mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int, b int, c varchar(10), PRIMARY KEY (c))")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - _, ok = tracker.dsTracker.tableInfos[tableID] - c.Assert(ok, IsTrue) - c.Assert(indexinfo, NotNil) + c.Assert(dti, NotNil) + c.Assert(dti.AbsoluteUKIndexInfo, NotNil) delete(tracker.dsTracker.tableInfos, tableID) mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int primary key, b int, c varchar(10))")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - _, ok = tracker.dsTracker.tableInfos[tableID] - c.Assert(ok, IsTrue) - c.Assert(indexinfo, NotNil) + c.Assert(dti, NotNil) + c.Assert(dti.AbsoluteUKIndexInfo, NotNil) delete(tracker.dsTracker.tableInfos, tableID) // downstream has composite pks mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int, b int, c varchar(10), PRIMARY KEY (a,b))")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - _, ok = tracker.dsTracker.tableInfos[tableID] - c.Assert(ok, IsTrue) - c.Assert(len(indexinfo.Columns) == 2, IsTrue) + c.Assert(dti, NotNil) + c.Assert(dti.AbsoluteUKIndexInfo, NotNil) + c.Assert(len(dti.AbsoluteUKIndexInfo.Columns) == 2, IsTrue) delete(tracker.dsTracker.tableInfos, tableID) // downstream has uk(not null) mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int unique not null, b int, c varchar(10))")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - _, ok = tracker.dsTracker.tableInfos[tableID] - c.Assert(ok, IsTrue) - c.Assert(indexinfo.Columns, NotNil) + c.Assert(dti, NotNil) + c.Assert(dti.AbsoluteUKIndexInfo, NotNil) + c.Assert(dti.AbsoluteUKIndexInfo.Columns, NotNil) delete(tracker.dsTracker.tableInfos, tableID) // downstream has uk(without not null) mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int unique, b int, c varchar(10))")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - dti, ok := tracker.dsTracker.tableInfos[tableID] - c.Assert(ok, IsTrue) - c.Assert(indexinfo, IsNil) - c.Assert(dti.availableUKCache, NotNil) + c.Assert(dti, NotNil) + c.Assert(dti.AbsoluteUKIndexInfo, IsNil) + c.Assert(dti.AvailableUKIndexList, NotNil) delete(tracker.dsTracker.tableInfos, tableID) // downstream has uks mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int unique, b int unique, c varchar(10) unique not null)")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - dti, ok = tracker.dsTracker.tableInfos[tableID] - c.Assert(ok, IsTrue) - c.Assert(indexinfo, NotNil) - c.Assert(len(dti.availableUKCache) == 2, IsTrue) + c.Assert(dti, NotNil) + c.Assert(dti.AbsoluteUKIndexInfo, NotNil) + c.Assert(len(dti.AvailableUKIndexList) == 3, IsTrue) + c.Assert(dti.AvailableUKIndexList[0] == dti.AbsoluteUKIndexInfo, IsTrue) delete(tracker.dsTracker.tableInfos, tableID) // downstream has pk and uk, pk has priority mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int unique not null , b int, c varchar(10), PRIMARY KEY (c))")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - c.Assert(indexinfo.Primary, IsTrue) + c.Assert(dti.AbsoluteUKIndexInfo.Primary, IsTrue) + c.Assert(len(dti.AvailableUKIndexList) == 2, IsTrue) + c.Assert(dti.AvailableUKIndexList[0] == dti.AbsoluteUKIndexInfo, IsTrue) delete(tracker.dsTracker.tableInfos, tableID) // downstream has more columns than upstream, and that column in used in PK mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int , d int PRIMARY KEY, c varchar(10), b int unique not null)")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - c.Assert(indexinfo, NotNil) - c.Assert(indexinfo.Primary, IsFalse) + c.Assert(dti.AbsoluteUKIndexInfo, NotNil) + c.Assert(dti.AbsoluteUKIndexInfo.Primary, IsFalse) + c.Assert(len(dti.AvailableUKIndexList) == 1, IsTrue) delete(tracker.dsTracker.tableInfos, tableID) mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int , d int PRIMARY KEY, c varchar(10), b int unique)")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - c.Assert(indexinfo, IsNil) - dti, ok = tracker.dsTracker.tableInfos[tableID] - c.Assert(ok, IsTrue) - c.Assert(len(dti.availableUKCache) == 1, IsTrue) + c.Assert(dti.AbsoluteUKIndexInfo, IsNil) + c.Assert(len(dti.AvailableUKIndexList) == 1, IsTrue) delete(tracker.dsTracker.tableInfos, tableID) mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int , d int PRIMARY KEY, c varchar(10), b int)")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - c.Assert(indexinfo, IsNil) + c.Assert(dti.AbsoluteUKIndexInfo, IsNil) delete(tracker.dsTracker.tableInfos, tableID) // downstream has more columns than upstream, and that column in used in UK(not null) mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int , d int unique not null, c varchar(10), b int unique not null)")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - c.Assert(indexinfo, NotNil) - c.Assert(indexinfo.Columns[0].Name.L == "b", IsTrue) + c.Assert(dti.AbsoluteUKIndexInfo, NotNil) + c.Assert(dti.AbsoluteUKIndexInfo.Columns[0].Name.L == "b", IsTrue) delete(tracker.dsTracker.tableInfos, tableID) mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int , d int unique not null, c varchar(10), b int unique)")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - c.Assert(indexinfo, IsNil) - dti, ok = tracker.dsTracker.tableInfos[tableID] - c.Assert(ok, IsTrue) - c.Assert(len(dti.availableUKCache) == 1, IsTrue) + c.Assert(dti.AbsoluteUKIndexInfo, IsNil) + c.Assert(len(dti.AvailableUKIndexList) == 1, IsTrue) delete(tracker.dsTracker.tableInfos, tableID) mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int , d int unique not null, c varchar(10), b int)")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - c.Assert(indexinfo, IsNil) + c.Assert(dti.AbsoluteUKIndexInfo, IsNil) delete(tracker.dsTracker.tableInfos, tableID) } @@ -777,11 +772,11 @@ func (s *trackerSuite) TestGetAvailableDownStreanUKIndexInfo(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int, b int, c varchar(10))")) - indexinfo, err := tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err := tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - c.Assert(indexinfo, IsNil) + c.Assert(dti.AbsoluteUKIndexInfo, IsNil) data := []interface{}{1, 2, 3} - indexinfo = tracker.GetAvailableDownStreamUKIndexInfo(tableID, data) + indexinfo := tracker.GetAvailableDownStreamUKIndexInfo(tableID, data) c.Assert(indexinfo, IsNil) delete(tracker.dsTracker.tableInfos, tableID) @@ -789,9 +784,9 @@ func (s *trackerSuite) TestGetAvailableDownStreanUKIndexInfo(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int unique, b int, c varchar(10))")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - c.Assert(indexinfo, IsNil) + c.Assert(dti.AbsoluteUKIndexInfo, IsNil) data = []interface{}{nil, 2, 3} indexinfo = tracker.GetAvailableDownStreamUKIndexInfo(tableID, data) c.Assert(indexinfo, IsNil) @@ -801,9 +796,9 @@ func (s *trackerSuite) TestGetAvailableDownStreanUKIndexInfo(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int unique, b int, c varchar(10))")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - c.Assert(indexinfo, IsNil) + c.Assert(dti.AbsoluteUKIndexInfo, IsNil) data = []interface{}{1, 2, 3} indexinfo = tracker.GetAvailableDownStreamUKIndexInfo(tableID, data) c.Assert(indexinfo, NotNil) @@ -813,9 +808,9 @@ func (s *trackerSuite) TestGetAvailableDownStreanUKIndexInfo(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int, b int, c varchar(10), unique key(a, b))")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - c.Assert(indexinfo, IsNil) + c.Assert(dti.AbsoluteUKIndexInfo, IsNil) data = []interface{}{1, nil, 3} indexinfo = tracker.GetAvailableDownStreamUKIndexInfo(tableID, data) c.Assert(indexinfo, IsNil) @@ -824,9 +819,9 @@ func (s *trackerSuite) TestGetAvailableDownStreanUKIndexInfo(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int, b int, c varchar(10), unique key(a, b))")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - c.Assert(indexinfo, IsNil) + c.Assert(dti.AbsoluteUKIndexInfo, IsNil) data = []interface{}{1, nil, nil} indexinfo = tracker.GetAvailableDownStreamUKIndexInfo(tableID, data) c.Assert(indexinfo, IsNil) @@ -836,9 +831,9 @@ func (s *trackerSuite) TestGetAvailableDownStreanUKIndexInfo(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int, b int, c varchar(10), unique key(a, b))")) - indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + dti, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) - c.Assert(indexinfo, IsNil) + c.Assert(dti.AbsoluteUKIndexInfo, IsNil) data = []interface{}{1, 2, 3} indexinfo = tracker.GetAvailableDownStreamUKIndexInfo(tableID, data) c.Assert(indexinfo, NotNil) @@ -876,7 +871,7 @@ func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int, b int, c varchar(10), PRIMARY KEY (a,b))")) - _, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + _, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) _, ok := tracker.dsTracker.tableInfos[tableID] c.Assert(ok, IsTrue) @@ -890,7 +885,7 @@ func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int, b int, c varchar(10), PRIMARY KEY (a,b))")) - _, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + _, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) _, ok = tracker.dsTracker.tableInfos[tableID] c.Assert(ok, IsTrue) @@ -902,7 +897,7 @@ func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int primary key, b int, c varchar(10))")) - _, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + _, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) _, ok = tracker.dsTracker.tableInfos[tableID] c.Assert(ok, IsTrue) @@ -916,7 +911,7 @@ func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int, b int, c varchar(10), PRIMARY KEY (a,b))")) - _, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + _, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) _, ok = tracker.dsTracker.tableInfos[tableID] c.Assert(ok, IsTrue) @@ -928,7 +923,7 @@ func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("test", "create table t(a int primary key, b int, c varchar(10))")) - _, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi) + _, err = tracker.GetDownStreamTableInfo(tcontext.Background(), tableID, oriTi) c.Assert(err, IsNil) _, ok = tracker.dsTracker.tableInfos[tableID] c.Assert(ok, IsTrue) diff --git a/dm/syncer/causality_test.go b/dm/syncer/causality_test.go index 9129abbac65..13e99125ca9 100644 --- a/dm/syncer/causality_test.go +++ b/dm/syncer/causality_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/ticdc/dm/pkg/binlog" tcontext "github.com/pingcap/ticdc/dm/pkg/context" "github.com/pingcap/ticdc/dm/pkg/log" + "github.com/pingcap/ticdc/dm/pkg/schema" "github.com/pingcap/ticdc/dm/pkg/utils" ) @@ -56,8 +57,8 @@ func (s *testSyncerSuite) TestDetectConflict(c *C) { func (s *testSyncerSuite) TestCasuality(c *C) { p := parser.New() se := mock.NewContext() - schema := "create table tb(a int primary key, b int unique);" - ti, err := createTableInfo(p, se, int64(0), schema) + schemaStr := "create table tb(a int primary key, b int unique);" + ti, err := createTableInfo(p, se, int64(0), schemaStr) c.Assert(err, IsNil) tiIndex := &model.IndexInfo{ Table: ti.Name, @@ -71,6 +72,8 @@ func (s *testSyncerSuite) TestCasuality(c *C) { Length: types.UnspecifiedLength, }}, } + downTi := schema.GetDownStreamTi(ti, ti) + c.Assert(downTi, NotNil) jobCh := make(chan *job, 10) syncer := &Syncer{ @@ -117,7 +120,7 @@ func (s *testSyncerSuite) TestCasuality(c *C) { ec := &eventContext{startLocation: &location, currentLocation: &location, lastLocation: &location} for _, tc := range testCases { - job := newDMLJob(tc.op, table, table, newDML(tc.op, false, "", table, tc.oldVals, tc.vals, tc.oldVals, tc.vals, ti.Columns, ti, tiIndex), ec) + job := newDMLJob(tc.op, table, table, newDML(tc.op, false, "", table, tc.oldVals, tc.vals, tc.oldVals, tc.vals, ti.Columns, ti, tiIndex, downTi), ec) jobCh <- job } diff --git a/dm/syncer/compactor_test.go b/dm/syncer/compactor_test.go index d8c1dc5276b..b78d225be42 100644 --- a/dm/syncer/compactor_test.go +++ b/dm/syncer/compactor_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/ticdc/dm/pkg/binlog" tcontext "github.com/pingcap/ticdc/dm/pkg/context" "github.com/pingcap/ticdc/dm/pkg/log" + "github.com/pingcap/ticdc/dm/pkg/schema" "github.com/pingcap/ticdc/dm/pkg/utils" ) @@ -75,8 +76,8 @@ func (s *testSyncerSuite) TestCompactJob(c *C) { targetTableID := "`test`.`tb`" sourceTable := &filter.Table{Schema: "test", Name: "tb1"} targetTable := &filter.Table{Schema: "test", Name: "tb"} - schema := "create table test.tb(id int primary key, col1 int, name varchar(24))" - ti, err := createTableInfo(p, se, 0, schema) + schemaStr := "create table test.tb(id int primary key, col1 int, name varchar(24))" + ti, err := createTableInfo(p, se, 0, schemaStr) c.Assert(err, IsNil) tiIndex := &model.IndexInfo{ Table: ti.Name, @@ -90,6 +91,8 @@ func (s *testSyncerSuite) TestCompactJob(c *C) { Length: types.UnspecifiedLength, }}, } + downTi := schema.GetDownStreamTi(ti, ti) + c.Assert(downTi, NotNil) var dml *DML var dmls []*DML @@ -108,7 +111,7 @@ func (s *testSyncerSuite) TestCompactJob(c *C) { oldValues, ok := kv[newID] if !ok { // insert - dml = newDML(insert, false, targetTableID, sourceTable, nil, values, nil, values, ti.Columns, ti, tiIndex) + dml = newDML(insert, false, targetTableID, sourceTable, nil, values, nil, values, ti.Columns, ti, tiIndex, downTi) } else { if rand.Int()%2 > 0 { // update @@ -122,10 +125,10 @@ func (s *testSyncerSuite) TestCompactJob(c *C) { } } } - dml = newDML(update, false, targetTableID, sourceTable, oldValues, values, oldValues, values, ti.Columns, ti, tiIndex) + dml = newDML(update, false, targetTableID, sourceTable, oldValues, values, oldValues, values, ti.Columns, ti, tiIndex, downTi) } else { // delete - dml = newDML(del, false, targetTableID, sourceTable, nil, oldValues, nil, oldValues, ti.Columns, ti, tiIndex) + dml = newDML(del, false, targetTableID, sourceTable, nil, oldValues, nil, oldValues, ti.Columns, ti, tiIndex, downTi) } } @@ -190,8 +193,8 @@ func (s *testSyncerSuite) TestCompactorSafeMode(c *C) { targetTableID := "`test`.`tb`" sourceTable := &filter.Table{Schema: "test", Name: "tb1"} targetTable := &filter.Table{Schema: "test", Name: "tb"} - schema := "create table test.tb(id int primary key, col1 int, name varchar(24))" - ti, err := createTableInfo(p, se, 0, schema) + schemaStr := "create table test.tb(id int primary key, col1 int, name varchar(24))" + ti, err := createTableInfo(p, se, 0, schemaStr) c.Assert(err, IsNil) tiIndex := &model.IndexInfo{ Table: ti.Name, @@ -205,6 +208,8 @@ func (s *testSyncerSuite) TestCompactorSafeMode(c *C) { Length: types.UnspecifiedLength, }}, } + downTi := schema.GetDownStreamTi(ti, ti) + c.Assert(downTi, NotNil) testCases := []struct { input []*DML @@ -213,31 +218,31 @@ func (s *testSyncerSuite) TestCompactorSafeMode(c *C) { // nolint:dupl { input: []*DML{ - newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex), - newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex), - newDML(update, true, targetTableID, sourceTable, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, ti.Columns, ti, tiIndex), - newDML(del, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex), - newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex), + newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex, downTi), + newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex, downTi), + newDML(update, true, targetTableID, sourceTable, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, ti.Columns, ti, tiIndex, downTi), + newDML(del, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex, downTi), + newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex, downTi), }, output: []*DML{ - newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{3, 3, "c"}, nil, []interface{}{3, 3, "c"}, ti.Columns, ti, tiIndex), - newDML(del, true, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex), - newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex), + newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{3, 3, "c"}, nil, []interface{}{3, 3, "c"}, ti.Columns, ti, tiIndex, downTi), + newDML(del, true, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex, downTi), + newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex, downTi), }, }, // nolint:dupl { input: []*DML{ - newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex), - newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex), - newDML(update, false, targetTableID, sourceTable, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, ti.Columns, ti, tiIndex), - newDML(del, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex), - newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex), + newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex, downTi), + newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex, downTi), + newDML(update, false, targetTableID, sourceTable, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, ti.Columns, ti, tiIndex, downTi), + newDML(del, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex, downTi), + newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex, downTi), }, output: []*DML{ - newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{3, 3, "c"}, nil, []interface{}{3, 3, "c"}, ti.Columns, ti, tiIndex), - newDML(del, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex), - newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex), + newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{3, 3, "c"}, nil, []interface{}{3, 3, "c"}, ti.Columns, ti, tiIndex, downTi), + newDML(del, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex, downTi), + newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex, downTi), }, }, } diff --git a/dm/syncer/dml.go b/dm/syncer/dml.go index 744dbe678af..c9b72f482d8 100644 --- a/dm/syncer/dml.go +++ b/dm/syncer/dml.go @@ -30,6 +30,7 @@ import ( tcontext "github.com/pingcap/ticdc/dm/pkg/context" "github.com/pingcap/ticdc/dm/pkg/log" + "github.com/pingcap/ticdc/dm/pkg/schema" "github.com/pingcap/ticdc/dm/pkg/terror" ) @@ -76,8 +77,9 @@ func extractValueFromData(data []interface{}, columns []*model.ColumnInfo) []int return value } -func (s *Syncer) genAndFilterInsertDMLs(param *genDMLParam, filterExprs []expression.Expression) ([]*DML, error) { +func (s *Syncer) genAndFilterInsertDMLs(tctx *tcontext.Context, param *genDMLParam, filterExprs []expression.Expression) ([]*DML, error) { var ( + tableID = param.targetTableID dataSeq = param.data originalDataSeq = param.originalData columns = param.columns @@ -85,6 +87,13 @@ func (s *Syncer) genAndFilterInsertDMLs(param *genDMLParam, filterExprs []expres dmls = make([]*DML, 0, len(dataSeq)) ) + // if downstream pk/uk(not null) exits, then use downstream pk/uk(not null) + downstreamTableInfo, err := s.schemaTracker.GetDownStreamTableInfo(tctx, tableID, ti) + if err != nil { + return nil, err + } + downstreamIndexColumns := downstreamTableInfo.AbsoluteUKIndexInfo + RowLoop: for dataIdx, data := range dataSeq { if len(data) != len(columns) { @@ -108,7 +117,11 @@ RowLoop: } } - dmls = append(dmls, newDML(insert, param.safeMode, param.targetTableID, param.sourceTable, nil, value, nil, originalValue, columns, ti, nil)) + if downstreamIndexColumns == nil { + downstreamIndexColumns = s.schemaTracker.GetAvailableDownStreamUKIndexInfo(tableID, value) + } + + dmls = append(dmls, newDML(insert, param.safeMode, tableID, param.sourceTable, nil, value, nil, originalValue, columns, ti, downstreamIndexColumns, downstreamTableInfo)) } return dmls, nil @@ -130,10 +143,11 @@ func (s *Syncer) genAndFilterUpdateDMLs( ) // if downstream pk/uk(not null) exits, then use downstream pk/uk(not null) - downstreamIndexColumns, err := s.schemaTracker.GetDownStreamIndexInfo(tctx, tableID, ti) + downstreamTableInfo, err := s.schemaTracker.GetDownStreamTableInfo(tctx, tableID, ti) if err != nil { return nil, err } + downstreamIndexColumns := downstreamTableInfo.AbsoluteUKIndexInfo RowLoop: for i := 0; i < len(data); i += 2 { @@ -184,7 +198,7 @@ RowLoop: downstreamIndexColumns = s.schemaTracker.GetAvailableDownStreamUKIndexInfo(tableID, oriOldValues) } - dmls = append(dmls, newDML(update, param.safeMode, param.targetTableID, param.sourceTable, oldValues, changedValues, oriOldValues, oriChangedValues, columns, ti, downstreamIndexColumns)) + dmls = append(dmls, newDML(update, param.safeMode, param.targetTableID, param.sourceTable, oldValues, changedValues, oriOldValues, oriChangedValues, columns, ti, downstreamIndexColumns, downstreamTableInfo)) } return dmls, nil @@ -199,10 +213,11 @@ func (s *Syncer) genAndFilterDeleteDMLs(tctx *tcontext.Context, param *genDMLPar ) // if downstream pk/uk(not null) exits, then use downstream pk/uk(not null) - downstreamIndexColumns, err := s.schemaTracker.GetDownStreamIndexInfo(tctx, tableID, ti) + downstreamTableInfo, err := s.schemaTracker.GetDownStreamTableInfo(tctx, tableID, ti) if err != nil { return nil, err } + downstreamIndexColumns := downstreamTableInfo.AbsoluteUKIndexInfo RowLoop: for _, data := range dataSeq { @@ -227,7 +242,7 @@ RowLoop: downstreamIndexColumns = s.schemaTracker.GetAvailableDownStreamUKIndexInfo(tableID, value) } - dmls = append(dmls, newDML(del, false, param.targetTableID, param.sourceTable, nil, value, nil, value, ti.Columns, ti, downstreamIndexColumns)) + dmls = append(dmls, newDML(del, false, param.targetTableID, param.sourceTable, nil, value, nil, value, ti.Columns, ti, downstreamIndexColumns, downstreamTableInfo)) } return dmls, nil @@ -451,34 +466,36 @@ func checkLogColumns(skipped [][]int) error { // DML stores param for DML. type DML struct { - targetTableID string - sourceTable *filter.Table - op opType - oldValues []interface{} // only for update SQL - values []interface{} - columns []*model.ColumnInfo - sourceTableInfo *model.TableInfo - originOldValues []interface{} // only for update SQL - originValues []interface{} // use to gen key and `WHERE` - safeMode bool - key string // use to detect causality - downstreamIndexInfo *model.IndexInfo + targetTableID string + sourceTable *filter.Table + op opType + oldValues []interface{} // only for update SQL + values []interface{} + columns []*model.ColumnInfo + sourceTableInfo *model.TableInfo + originOldValues []interface{} // only for update SQL + originValues []interface{} // use to gen key and `WHERE` + safeMode bool + key string // use to detect causality + pickedDownstreamIndexInfo *model.IndexInfo // pick an index from downstream which comes from pk/uk not null/uk value not null and is only used in genWhere + downstreamTableInfo *schema.DownstreamTableInfo // downstream table info } // newDML creates DML. -func newDML(op opType, safeMode bool, targetTableID string, sourceTable *filter.Table, oldValues, values, originOldValues, originValues []interface{}, columns []*model.ColumnInfo, sourceTableInfo *model.TableInfo, downstreamIndexInfo *model.IndexInfo) *DML { +func newDML(op opType, safeMode bool, targetTableID string, sourceTable *filter.Table, oldValues, values, originOldValues, originValues []interface{}, columns []*model.ColumnInfo, sourceTableInfo *model.TableInfo, pickedDownstreamIndexInfo *model.IndexInfo, downstreamTableInfo *schema.DownstreamTableInfo) *DML { return &DML{ - op: op, - safeMode: safeMode, - targetTableID: targetTableID, - sourceTable: sourceTable, - oldValues: oldValues, - values: values, - columns: columns, - sourceTableInfo: sourceTableInfo, - originOldValues: originOldValues, - originValues: originValues, - downstreamIndexInfo: downstreamIndexInfo, + op: op, + safeMode: safeMode, + targetTableID: targetTableID, + sourceTable: sourceTable, + oldValues: oldValues, + values: values, + columns: columns, + sourceTableInfo: sourceTableInfo, + originOldValues: originOldValues, + originValues: originValues, + pickedDownstreamIndexInfo: pickedDownstreamIndexInfo, + downstreamTableInfo: downstreamTableInfo, } } @@ -511,7 +528,7 @@ func updateToDelAndInsert(updateDML *DML) (*DML, *DML) { // identifyColumns gets columns of unique not null index. // This is used for compact. func (dml *DML) identifyColumns() []string { - if defaultIndexColumns := findFitIndex(dml.sourceTableInfo); defaultIndexColumns != nil { + if defaultIndexColumns := dml.downstreamTableInfo.AbsoluteUKIndexInfo; defaultIndexColumns != nil { columns := make([]string, 0, len(defaultIndexColumns.Columns)) for _, column := range defaultIndexColumns.Columns { columns = append(columns, column.Name.O) @@ -524,7 +541,7 @@ func (dml *DML) identifyColumns() []string { // identifyValues gets values of unique not null index. // This is used for compact. func (dml *DML) identifyValues() []interface{} { - if defaultIndexColumns := findFitIndex(dml.sourceTableInfo); defaultIndexColumns != nil { + if defaultIndexColumns := dml.downstreamTableInfo.AbsoluteUKIndexInfo; defaultIndexColumns != nil { values := make([]interface{}, 0, len(defaultIndexColumns.Columns)) for _, column := range defaultIndexColumns.Columns { values = append(values, dml.values[column.Offset]) @@ -537,7 +554,7 @@ func (dml *DML) identifyValues() []interface{} { // oldIdentifyValues gets old values of unique not null index. // only for update SQL. func (dml *DML) oldIdentifyValues() []interface{} { - if defaultIndexColumns := findFitIndex(dml.sourceTableInfo); defaultIndexColumns != nil { + if defaultIndexColumns := dml.downstreamTableInfo.AbsoluteUKIndexInfo; defaultIndexColumns != nil { values := make([]interface{}, 0, len(defaultIndexColumns.Columns)) for _, column := range defaultIndexColumns.Columns { values = append(values, dml.oldValues[column.Offset]) @@ -583,11 +600,11 @@ func (dml *DML) identifyKeys() []string { var keys []string // for UPDATE statement if dml.originOldValues != nil { - keys = append(keys, genMultipleKeys(dml.sourceTableInfo, dml.originOldValues, dml.targetTableID)...) + keys = append(keys, genMultipleKeys(dml.downstreamTableInfo, dml.sourceTableInfo, dml.originOldValues, dml.targetTableID)...) } if dml.originValues != nil { - keys = append(keys, genMultipleKeys(dml.sourceTableInfo, dml.originValues, dml.targetTableID)...) + keys = append(keys, genMultipleKeys(dml.downstreamTableInfo, dml.sourceTableInfo, dml.originValues, dml.targetTableID)...) } return keys } @@ -610,8 +627,8 @@ func (dml *DML) whereColumnsAndValues() ([]string, []interface{}) { values = dml.originOldValues } - if dml.downstreamIndexInfo != nil { - columns, values = getColumnData(dml.sourceTableInfo.Columns, dml.downstreamIndexInfo, values) + if dml.pickedDownstreamIndexInfo != nil { + columns, values = getColumnData(dml.sourceTableInfo.Columns, dml.pickedDownstreamIndexInfo, values) } columnNames := make([]string, 0, len(columns)) @@ -668,21 +685,10 @@ func genKeyList(table string, columns []*model.ColumnInfo, dataSeq []interface{} // genMultipleKeys gens keys with UNIQUE NOT NULL value. // if not UNIQUE NOT NULL value, use table name instead. -func genMultipleKeys(ti *model.TableInfo, value []interface{}, table string) []string { - multipleKeys := make([]string, 0, len(ti.Indices)+1) - if ti.PKIsHandle { - if pk := ti.GetPkColInfo(); pk != nil { - cols := []*model.ColumnInfo{pk} - vals := []interface{}{value[pk.Offset]} - multipleKeys = append(multipleKeys, genKeyList(table, cols, vals)) - } - } +func genMultipleKeys(downstreamTableInfo *schema.DownstreamTableInfo, ti *model.TableInfo, value []interface{}, table string) []string { + multipleKeys := make([]string, 0, len(downstreamTableInfo.AvailableUKIndexList)) - for _, indexCols := range ti.Indices { - // PK also has a true Unique - if !indexCols.Unique { - continue - } + for _, indexCols := range downstreamTableInfo.AvailableUKIndexList { cols, vals := getColumnData(ti.Columns, indexCols, value) key := genKeyList(table, cols, vals) if len(key) > 0 { // ignore `null` value. @@ -933,16 +939,15 @@ func genSQLMultipleRows(op dmlOpType, dmls []*DML) (queries []string, args [][]i // sameColumns check whether two DMLs have same columns. func sameColumns(lhs *DML, rhs *DML) bool { - // if source table is same, columns will be same. - if lhs.sourceTable.Schema == rhs.sourceTable.Schema && lhs.sourceTable.Name == rhs.sourceTable.Name { - return true - } - var lhsCols, rhsCols []string if lhs.op == del { lhsCols, _ = lhs.whereColumnsAndValues() rhsCols, _ = rhs.whereColumnsAndValues() } else { + // if source table is same, columns will be same. + if lhs.sourceTable.Schema == rhs.sourceTable.Schema && lhs.sourceTable.Name == rhs.sourceTable.Name { + return true + } lhsCols = lhs.columnNames() rhsCols = rhs.columnNames() } diff --git a/dm/syncer/dml_test.go b/dm/syncer/dml_test.go index 3d892066a9e..fd0d3bd43a2 100644 --- a/dm/syncer/dml_test.go +++ b/dm/syncer/dml_test.go @@ -18,6 +18,7 @@ import ( "strings" . "github.com/pingcap/check" + "github.com/pingcap/ticdc/dm/pkg/schema" "github.com/pingcap/tidb-tools/pkg/filter" tiddl "github.com/pingcap/tidb/ddl" @@ -171,7 +172,7 @@ func (s *testSyncerSuite) TestGenMultipleKeys(c *C) { // non-integer primary key schema: `create table t65(a int unique, b varchar(16) primary key)`, values: []interface{}{16, "xyz"}, - keys: []string{"16.a.table", "xyz.b.table"}, + keys: []string{"xyz.b.table", "16.a.table"}, }, { // primary key of multiple columns @@ -214,14 +215,16 @@ func (s *testSyncerSuite) TestGenMultipleKeys(c *C) { } for i, tc := range testCases { - schema := tc.schema + schemaStr := tc.schema assert := func(obtained interface{}, checker Checker, args ...interface{}) { - c.Assert(obtained, checker, append(args, Commentf("test case schema: %s", schema))...) + c.Assert(obtained, checker, append(args, Commentf("test case schema: %s", schemaStr))...) } ti, err := createTableInfo(p, se, int64(i+1), tc.schema) assert(err, IsNil) - keys := genMultipleKeys(ti, tc.values, "table") + dti := schema.GetDownStreamTi(ti, ti) + assert(dti, NotNil) + keys := genMultipleKeys(dti, ti, tc.values, "table") assert(keys, DeepEquals, tc.keys) } } @@ -254,22 +257,22 @@ func (s *testSyncerSuite) TestGenWhere(c *C) { values []interface{} }{ { - newDML(del, false, "", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti1.Columns, ti1, ti1Index), + newDML(del, false, "", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti1.Columns, ti1, ti1Index, nil), "`id` = ?", []interface{}{1}, }, { - newDML(update, false, "", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, ti1.Columns, ti1, ti1Index), + newDML(update, false, "", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, ti1.Columns, ti1, ti1Index, nil), "`id` = ?", []interface{}{1}, }, { - newDML(del, false, "", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti2.Columns, ti2, nil), + newDML(del, false, "", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti2.Columns, ti2, nil, nil), "`id` = ? AND `col1` = ? AND `col2` = ? AND `name` = ?", []interface{}{1, 2, 3, "haha"}, }, { - newDML(update, false, "", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, ti2.Columns, ti2, nil), + newDML(update, false, "", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, ti2.Columns, ti2, nil, nil), "`id` = ? AND `col1` = ? AND `col2` = ? AND `name` = ?", []interface{}{1, 2, 3, "haha"}, }, @@ -308,27 +311,27 @@ func (s *testSyncerSuite) TestGenSQL(c *C) { args [][]interface{} }{ { - newDML(insert, false, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex), + newDML(insert, false, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex, nil), []string{"INSERT INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?)"}, [][]interface{}{{1, 2, 3, "haha"}}, }, { - newDML(insert, true, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex), + newDML(insert, true, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex, nil), []string{"INSERT INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)"}, [][]interface{}{{1, 2, 3, "haha"}}, }, { - newDML(del, false, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex), + newDML(del, false, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex, nil), []string{"DELETE FROM `targetSchema`.`targetTable` WHERE `id` = ? LIMIT 1"}, [][]interface{}{{1}}, }, { - newDML(update, false, "`targetSchema`.`targetTable`", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex), + newDML(update, false, "`targetSchema`.`targetTable`", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex, nil), []string{"UPDATE `targetSchema`.`targetTable` SET `id` = ?, `col1` = ?, `col2` = ?, `name` = ? WHERE `id` = ? LIMIT 1"}, [][]interface{}{{4, 5, 6, "hihi", 1}}, }, { - newDML(update, true, "`targetSchema`.`targetTable`", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex), + newDML(update, true, "`targetSchema`.`targetTable`", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex, nil), []string{"DELETE FROM `targetSchema`.`targetTable` WHERE `id` = ? LIMIT 1", "INSERT INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)"}, [][]interface{}{{1}, {4, 5, 6, "hihi"}}, }, @@ -375,6 +378,9 @@ func (s *testSyncerSuite) TestGenDMLWithSameOp(c *C) { Length: types.UnspecifiedLength, }}, } + downTi11 := &schema.DownstreamTableInfo{ + AbsoluteUKIndexInfo: ti11Index, + } ti12, err := createTableInfo(p, se, 0, schema12) c.Assert(err, IsNil) ti12Index := &model.IndexInfo{ @@ -389,6 +395,9 @@ func (s *testSyncerSuite) TestGenDMLWithSameOp(c *C) { Length: types.UnspecifiedLength, }}, } + downTi12 := &schema.DownstreamTableInfo{ + AbsoluteUKIndexInfo: ti12Index, + } ti21, err := createTableInfo(p, se, 0, schema21) c.Assert(err, IsNil) ti21Index := &model.IndexInfo{ @@ -403,6 +412,9 @@ func (s *testSyncerSuite) TestGenDMLWithSameOp(c *C) { Length: types.UnspecifiedLength, }}, } + downTi21 := &schema.DownstreamTableInfo{ + AbsoluteUKIndexInfo: ti21Index, + } ti22, err := createTableInfo(p, se, 0, schema22) c.Assert(err, IsNil) ti22Index := &model.IndexInfo{ @@ -417,56 +429,59 @@ func (s *testSyncerSuite) TestGenDMLWithSameOp(c *C) { Length: types.UnspecifiedLength, }}, } + downTi22 := &schema.DownstreamTableInfo{ + AbsoluteUKIndexInfo: ti22Index, + } dmls := []*DML{ // insert - newDML(insert, true, targetTableID1, sourceTable11, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti11.Columns, ti11, ti11Index), - newDML(insert, true, targetTableID1, sourceTable11, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti11.Columns, ti11, ti11Index), - newDML(insert, true, targetTableID1, sourceTable12, nil, []interface{}{3, 3, "c"}, nil, []interface{}{3, 3, "c"}, ti12.Columns, ti12, ti12Index), + newDML(insert, true, targetTableID1, sourceTable11, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti11.Columns, ti11, ti11Index, downTi11), + newDML(insert, true, targetTableID1, sourceTable11, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti11.Columns, ti11, ti11Index, downTi11), + newDML(insert, true, targetTableID1, sourceTable12, nil, []interface{}{3, 3, "c"}, nil, []interface{}{3, 3, "c"}, ti12.Columns, ti12, ti12Index, downTi12), // update no index - newDML(update, true, targetTableID1, sourceTable11, []interface{}{1, 1, "a"}, []interface{}{1, 1, "aa"}, []interface{}{1, 1, "a"}, []interface{}{1, 1, "aa"}, ti11.Columns, ti11, ti11Index), - newDML(update, true, targetTableID1, sourceTable11, []interface{}{2, 2, "b"}, []interface{}{2, 2, "bb"}, []interface{}{2, 2, "b"}, []interface{}{2, 2, "bb"}, ti11.Columns, ti11, ti11Index), - newDML(update, true, targetTableID1, sourceTable12, []interface{}{3, 3, "c"}, []interface{}{3, 3, "cc"}, []interface{}{3, 3, "c"}, []interface{}{3, 3, "cc"}, ti12.Columns, ti12, ti12Index), + newDML(update, true, targetTableID1, sourceTable11, []interface{}{1, 1, "a"}, []interface{}{1, 1, "aa"}, []interface{}{1, 1, "a"}, []interface{}{1, 1, "aa"}, ti11.Columns, ti11, ti11Index, downTi11), + newDML(update, true, targetTableID1, sourceTable11, []interface{}{2, 2, "b"}, []interface{}{2, 2, "bb"}, []interface{}{2, 2, "b"}, []interface{}{2, 2, "bb"}, ti11.Columns, ti11, ti11Index, downTi11), + newDML(update, true, targetTableID1, sourceTable12, []interface{}{3, 3, "c"}, []interface{}{3, 3, "cc"}, []interface{}{3, 3, "c"}, []interface{}{3, 3, "cc"}, ti12.Columns, ti12, ti12Index, downTi12), // update uk - newDML(update, true, targetTableID1, sourceTable11, []interface{}{1, 1, "aa"}, []interface{}{1, 4, "aa"}, []interface{}{1, 1, "aa"}, []interface{}{1, 4, "aa"}, ti11.Columns, ti11, ti11Index), - newDML(update, true, targetTableID1, sourceTable11, []interface{}{2, 2, "bb"}, []interface{}{2, 5, "bb"}, []interface{}{2, 2, "bb"}, []interface{}{2, 5, "bb"}, ti11.Columns, ti11, ti11Index), - newDML(update, true, targetTableID1, sourceTable12, []interface{}{3, 3, "cc"}, []interface{}{3, 6, "cc"}, []interface{}{3, 3, "cc"}, []interface{}{3, 6, "cc"}, ti12.Columns, ti12, ti12Index), + newDML(update, true, targetTableID1, sourceTable11, []interface{}{1, 1, "aa"}, []interface{}{1, 4, "aa"}, []interface{}{1, 1, "aa"}, []interface{}{1, 4, "aa"}, ti11.Columns, ti11, ti11Index, downTi11), + newDML(update, true, targetTableID1, sourceTable11, []interface{}{2, 2, "bb"}, []interface{}{2, 5, "bb"}, []interface{}{2, 2, "bb"}, []interface{}{2, 5, "bb"}, ti11.Columns, ti11, ti11Index, downTi11), + newDML(update, true, targetTableID1, sourceTable12, []interface{}{3, 3, "cc"}, []interface{}{3, 6, "cc"}, []interface{}{3, 3, "cc"}, []interface{}{3, 6, "cc"}, ti12.Columns, ti12, ti12Index, downTi12), // update pk - newDML(update, true, targetTableID1, sourceTable11, []interface{}{1, 4, "aa"}, []interface{}{4, 4, "aa"}, []interface{}{1, 1, "aa"}, []interface{}{4, 4, "aa"}, ti11.Columns, ti11, ti11Index), - newDML(update, true, targetTableID1, sourceTable11, []interface{}{2, 5, "bb"}, []interface{}{5, 5, "bb"}, []interface{}{2, 2, "bb"}, []interface{}{5, 5, "bb"}, ti11.Columns, ti11, ti11Index), - newDML(update, true, targetTableID1, sourceTable12, []interface{}{3, 6, "cc"}, []interface{}{6, 6, "cc"}, []interface{}{3, 3, "cc"}, []interface{}{6, 6, "cc"}, ti12.Columns, ti12, ti12Index), + newDML(update, true, targetTableID1, sourceTable11, []interface{}{1, 4, "aa"}, []interface{}{4, 4, "aa"}, []interface{}{1, 1, "aa"}, []interface{}{4, 4, "aa"}, ti11.Columns, ti11, ti11Index, downTi11), + newDML(update, true, targetTableID1, sourceTable11, []interface{}{2, 5, "bb"}, []interface{}{5, 5, "bb"}, []interface{}{2, 2, "bb"}, []interface{}{5, 5, "bb"}, ti11.Columns, ti11, ti11Index, downTi11), + newDML(update, true, targetTableID1, sourceTable12, []interface{}{3, 6, "cc"}, []interface{}{6, 6, "cc"}, []interface{}{3, 3, "cc"}, []interface{}{6, 6, "cc"}, ti12.Columns, ti12, ti12Index, downTi12), // delete - newDML(del, true, targetTableID1, sourceTable11, nil, []interface{}{4, 4, "aa"}, nil, []interface{}{4, 4, "aa"}, ti11.Columns, ti11, ti11Index), - newDML(del, true, targetTableID1, sourceTable11, nil, []interface{}{5, 5, "bb"}, nil, []interface{}{5, 5, "bb"}, ti11.Columns, ti11, ti11Index), - newDML(del, true, targetTableID1, sourceTable12, nil, []interface{}{6, 6, "cc"}, nil, []interface{}{6, 6, "cc"}, ti12.Columns, ti12, ti12Index), + newDML(del, true, targetTableID1, sourceTable11, nil, []interface{}{4, 4, "aa"}, nil, []interface{}{4, 4, "aa"}, ti11.Columns, ti11, ti11Index, downTi11), + newDML(del, true, targetTableID1, sourceTable11, nil, []interface{}{5, 5, "bb"}, nil, []interface{}{5, 5, "bb"}, ti11.Columns, ti11, ti11Index, downTi11), + newDML(del, true, targetTableID1, sourceTable12, nil, []interface{}{6, 6, "cc"}, nil, []interface{}{6, 6, "cc"}, ti12.Columns, ti12, ti12Index, downTi12), // target table 2 // insert - newDML(insert, true, targetTableID2, sourceTable21, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti21.Columns, ti21, ti21Index), - newDML(insert, false, targetTableID2, sourceTable21, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti21.Columns, ti21, ti21Index), - newDML(insert, false, targetTableID2, sourceTable22, nil, []interface{}{3, 3, "c"}, nil, []interface{}{3, 3, "c"}, ti22.Columns, ti22, ti22Index), + newDML(insert, true, targetTableID2, sourceTable21, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti21.Columns, ti21, ti21Index, downTi21), + newDML(insert, false, targetTableID2, sourceTable21, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti21.Columns, ti21, ti21Index, downTi21), + newDML(insert, false, targetTableID2, sourceTable22, nil, []interface{}{3, 3, "c"}, nil, []interface{}{3, 3, "c"}, ti22.Columns, ti22, ti22Index, downTi22), // update no index - newDML(update, false, targetTableID2, sourceTable21, []interface{}{1, 1, "a"}, []interface{}{1, 1, "aa"}, []interface{}{1, 1, "a"}, []interface{}{1, 1, "aa"}, ti21.Columns, ti21, ti21Index), - newDML(update, false, targetTableID2, sourceTable21, []interface{}{2, 2, "b"}, []interface{}{2, 2, "bb"}, []interface{}{2, 2, "b"}, []interface{}{2, 2, "bb"}, ti21.Columns, ti21, ti21Index), - newDML(update, false, targetTableID2, sourceTable22, []interface{}{3, 3, "c"}, []interface{}{3, 3, "cc"}, []interface{}{3, 3, "c"}, []interface{}{3, 3, "cc"}, ti22.Columns, ti22, ti21Index), + newDML(update, false, targetTableID2, sourceTable21, []interface{}{1, 1, "a"}, []interface{}{1, 1, "aa"}, []interface{}{1, 1, "a"}, []interface{}{1, 1, "aa"}, ti21.Columns, ti21, ti21Index, downTi21), + newDML(update, false, targetTableID2, sourceTable21, []interface{}{2, 2, "b"}, []interface{}{2, 2, "bb"}, []interface{}{2, 2, "b"}, []interface{}{2, 2, "bb"}, ti21.Columns, ti21, ti21Index, downTi21), + newDML(update, false, targetTableID2, sourceTable22, []interface{}{3, 3, "c"}, []interface{}{3, 3, "cc"}, []interface{}{3, 3, "c"}, []interface{}{3, 3, "cc"}, ti22.Columns, ti22, ti22Index, downTi22), // update uk - newDML(update, false, targetTableID2, sourceTable21, []interface{}{1, 1, "aa"}, []interface{}{1, 4, "aa"}, []interface{}{1, 1, "aa"}, []interface{}{1, 4, "aa"}, ti21.Columns, ti21, ti21Index), - newDML(update, false, targetTableID2, sourceTable21, []interface{}{2, 2, "bb"}, []interface{}{2, 5, "bb"}, []interface{}{2, 2, "bb"}, []interface{}{2, 5, "bb"}, ti21.Columns, ti21, ti21Index), - newDML(update, false, targetTableID2, sourceTable22, []interface{}{3, 3, "cc"}, []interface{}{3, 6, "cc"}, []interface{}{3, 3, "cc"}, []interface{}{3, 6, "cc"}, ti22.Columns, ti22, ti22Index), + newDML(update, false, targetTableID2, sourceTable21, []interface{}{1, 1, "aa"}, []interface{}{1, 4, "aa"}, []interface{}{1, 1, "aa"}, []interface{}{1, 4, "aa"}, ti21.Columns, ti21, ti21Index, downTi21), + newDML(update, false, targetTableID2, sourceTable21, []interface{}{2, 2, "bb"}, []interface{}{2, 5, "bb"}, []interface{}{2, 2, "bb"}, []interface{}{2, 5, "bb"}, ti21.Columns, ti21, ti21Index, downTi21), + newDML(update, false, targetTableID2, sourceTable22, []interface{}{3, 3, "cc"}, []interface{}{3, 6, "cc"}, []interface{}{3, 3, "cc"}, []interface{}{3, 6, "cc"}, ti22.Columns, ti22, ti22Index, downTi22), // update pk - newDML(update, false, targetTableID2, sourceTable21, []interface{}{1, 4, "aa"}, []interface{}{4, 4, "aa"}, []interface{}{1, 1, "aa"}, []interface{}{4, 4, "aa"}, ti21.Columns, ti21, ti21Index), - newDML(update, false, targetTableID2, sourceTable21, []interface{}{2, 5, "bb"}, []interface{}{5, 5, "bb"}, []interface{}{2, 2, "bb"}, []interface{}{5, 5, "bb"}, ti21.Columns, ti21, ti21Index), - newDML(update, false, targetTableID2, sourceTable22, []interface{}{3, 6, "cc"}, []interface{}{6, 6, "cc"}, []interface{}{3, 3, "cc"}, []interface{}{6, 6, "cc"}, ti22.Columns, ti22, ti22Index), + newDML(update, false, targetTableID2, sourceTable21, []interface{}{1, 4, "aa"}, []interface{}{4, 4, "aa"}, []interface{}{1, 1, "aa"}, []interface{}{4, 4, "aa"}, ti21.Columns, ti21, ti21Index, downTi21), + newDML(update, false, targetTableID2, sourceTable21, []interface{}{2, 5, "bb"}, []interface{}{5, 5, "bb"}, []interface{}{2, 2, "bb"}, []interface{}{5, 5, "bb"}, ti21.Columns, ti21, ti21Index, downTi21), + newDML(update, false, targetTableID2, sourceTable22, []interface{}{3, 6, "cc"}, []interface{}{6, 6, "cc"}, []interface{}{3, 3, "cc"}, []interface{}{6, 6, "cc"}, ti22.Columns, ti22, ti22Index, downTi22), // delete - newDML(del, false, targetTableID2, sourceTable21, nil, []interface{}{4, 4, "aa"}, nil, []interface{}{4, 4, "aa"}, ti21.Columns, ti21, ti21Index), - newDML(del, false, targetTableID2, sourceTable21, nil, []interface{}{5, 5, "bb"}, nil, []interface{}{5, 5, "bb"}, ti21.Columns, ti21, ti21Index), - newDML(del, false, targetTableID2, sourceTable22, nil, []interface{}{6, 6, "cc"}, nil, []interface{}{6, 6, "cc"}, ti22.Columns, ti22, ti22Index), + newDML(del, false, targetTableID2, sourceTable21, nil, []interface{}{4, 4, "aa"}, nil, []interface{}{4, 4, "aa"}, ti21.Columns, ti21, ti21Index, downTi21), + newDML(del, false, targetTableID2, sourceTable21, nil, []interface{}{5, 5, "bb"}, nil, []interface{}{5, 5, "bb"}, ti21.Columns, ti21, ti21Index, downTi21), + newDML(del, false, targetTableID2, sourceTable22, nil, []interface{}{6, 6, "cc"}, nil, []interface{}{6, 6, "cc"}, ti22.Columns, ti22, ti22Index, downTi22), // table1 // detele - newDML(del, false, targetTableID1, sourceTable11, nil, []interface{}{44, 44, "aaa"}, nil, []interface{}{44, 44, "aaa"}, ti11.Columns, ti11, ti11Index), - newDML(del, false, targetTableID1, sourceTable11, nil, []interface{}{55, 55, "bbb"}, nil, []interface{}{55, 55, "bbb"}, ti11.Columns, ti11, ti11Index), - newDML(del, false, targetTableID1, sourceTable12, nil, []interface{}{66, 66, "ccc"}, nil, []interface{}{66, 66, "ccc"}, ti12.Columns, ti12, ti12Index), + newDML(del, false, targetTableID1, sourceTable11, nil, []interface{}{44, 44, "aaa"}, nil, []interface{}{44, 44, "aaa"}, ti11.Columns, ti11, ti11Index, downTi11), + newDML(del, false, targetTableID1, sourceTable11, nil, []interface{}{55, 55, "bbb"}, nil, []interface{}{55, 55, "bbb"}, ti11.Columns, ti11, ti11Index, downTi11), + newDML(del, false, targetTableID1, sourceTable12, nil, []interface{}{66, 66, "ccc"}, nil, []interface{}{66, 66, "ccc"}, ti12.Columns, ti12, ti12Index, downTi12), } expectQueries := []string{ diff --git a/dm/syncer/job_test.go b/dm/syncer/job_test.go index f61e719439b..641e2b4e879 100644 --- a/dm/syncer/job_test.go +++ b/dm/syncer/job_test.go @@ -107,7 +107,7 @@ func (t *testJobSuite) TestJob(c *C) { jobStr string }{ { - newDMLJob(insert, table, table, newDML(insert, true, "targetTable", table, nil, []interface{}{2, 2}, nil, []interface{}{2, 2}, ti.Columns, ti, tiIndex), ec), + newDMLJob(insert, table, table, newDML(insert, true, "targetTable", table, nil, []interface{}{2, 2}, nil, []interface{}{2, 2}, ti.Columns, ti, tiIndex, nil), ec), "tp: insert, dml: [safemode: true, targetTableID: targetTable, op: insert, columns: [id col1], oldValues: [], values: [2 2]], ddls: [], last_location: position: (, 4), gtid-set: , start_location: position: (, 4), gtid-set: , current_location: position: (, 4), gtid-set: ", }, { newDDLJob(qec), diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 0a5f62788d3..2f02e564073 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -2064,7 +2064,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err } param.safeMode = ec.safeMode - dmls, err = s.genAndFilterInsertDMLs(param, exprFilter) + dmls, err = s.genAndFilterInsertDMLs(ec.tctx, param, exprFilter) if err != nil { return terror.Annotatef(err, "gen insert sqls failed, sourceTable: %v, targetTable: %v", sourceTable, targetTable) } diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 65adcd893bd..49741f7213a 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -787,6 +787,9 @@ func (s *testSyncerSuite) TestRun(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_1`").WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("t_1", "create table t_1(id int primary key, name varchar(24))")) + mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_1`").WillReturnRows( + sqlmock.NewRows([]string{"Table", "Create Table"}). + AddRow("t_1", "create table t_1(id int primary key, name varchar(24), KEY `index1` (`name`))")) s.mockGetServerUnixTS(mock) mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_2`").WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}).