Skip to content

Commit

Permalink
dm/syncer: multiple rows use downstream schema (#3308)
Browse files Browse the repository at this point in the history
  • Loading branch information
WizardXiao authored Nov 9, 2021
1 parent 6c003ad commit b4c6b17
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 234 deletions.
73 changes: 39 additions & 34 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ type Tracker struct {
type downstreamTracker struct {
downstreamConn *dbconn.DBConn // downstream connection
stmtParser *parser.Parser // statement parser
tableInfos map[string]*downstreamTableInfo // downstream table infos
tableInfos map[string]*DownstreamTableInfo // downstream table infos
}

// downstreamTableInfo contains tableinfo and index cache.
type downstreamTableInfo struct {
tableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
indexCache *model.IndexInfo // index cache include pk/uk(not null)
availableUKCache []*model.IndexInfo // index cache include uks(data not null)
// DownstreamTableInfo contains tableinfo and index cache.
type DownstreamTableInfo struct {
TableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
AbsoluteUKIndexInfo *model.IndexInfo // absolute uk index is a pk/uk(not null)
AvailableUKIndexList []*model.IndexInfo // index list which is all uks
}

// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
Expand Down Expand Up @@ -181,7 +181,7 @@ func NewTracker(ctx context.Context, task string, sessionCfg map[string]string,
// init downstreamTracker
dsTracker := &downstreamTracker{
downstreamConn: downstreamConn,
tableInfos: make(map[string]*downstreamTableInfo),
tableInfos: make(map[string]*DownstreamTableInfo),
}

return &Tracker{
Expand Down Expand Up @@ -375,9 +375,9 @@ func (tr *Tracker) GetSystemVar(name string) (string, bool) {
return tr.se.GetSessionVars().GetSystemVar(name)
}

// GetDownStreamIndexInfo gets downstream PK/UK(not null) Index.
// GetDownStreamTableInfo gets downstream table info.
// note. this function will init downstreamTrack's table info.
func (tr *Tracker) GetDownStreamIndexInfo(tctx *tcontext.Context, tableID string, originTi *model.TableInfo) (*model.IndexInfo, error) {
func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string, originTi *model.TableInfo) (*DownstreamTableInfo, error) {
dti, ok := tr.dsTracker.tableInfos[tableID]
if !ok {
tctx.Logger.Info("Downstream schema tracker init. ", zap.String("tableID", tableID))
Expand All @@ -387,32 +387,28 @@ func (tr *Tracker) GetDownStreamIndexInfo(tctx *tcontext.Context, tableID string
return nil, err
}

dti = getDownStreamTi(ti, originTi)
dti = GetDownStreamTi(ti, originTi)
tr.dsTracker.tableInfos[tableID] = dti
}
return dti.indexCache, nil
return dti, nil
}

// GetAvailableDownStreamUKIndexInfo gets available downstream UK whose data is not null.
// note. this function will not init downstreamTrack.
func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []interface{}) *model.IndexInfo {
dti, ok := tr.dsTracker.tableInfos[tableID]

if !ok || len(dti.availableUKCache) == 0 {
if !ok || len(dti.AvailableUKIndexList) == 0 {
return nil
}
// func for check data is not null
fn := func(i int) bool {
return data[i] != nil
}

for i, uk := range dti.availableUKCache {
for _, uk := range dti.AvailableUKIndexList {
// check uk's column data is not null
if isSpecifiedIndexColumn(uk, fn) {
if i != 0 {
// exchange available uk to the first of the array to reduce judgements for next row
dti.availableUKCache[0], dti.availableUKCache[i] = dti.availableUKCache[i], dti.availableUKCache[0]
}
return uk
}
}
Expand Down Expand Up @@ -487,36 +483,38 @@ func (tr *Tracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error
return nil
}

// getDownStreamTi constructs downstreamTable index cache by tableinfo.
func getDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *downstreamTableInfo {
// GetDownStreamTi constructs downstreamTable index cache by tableinfo.
func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo {
var (
indexCache *model.IndexInfo
availableUKCache = make([]*model.IndexInfo, 0, len(ti.Indices))
hasPk = false
absoluteUKIndexInfo *model.IndexInfo
availableUKIndexList = []*model.IndexInfo{}
hasPk = false
absoluteUKPosition = -1
)

// func for check not null constraint
fn := func(i int) bool {
return mysql.HasNotNullFlag(ti.Columns[i].Flag)
}

for _, idx := range ti.Indices {
for i, idx := range ti.Indices {
if !idx.Primary && !idx.Unique {
continue
}
indexRedirect := redirectIndexKeys(idx, originTi)
if indexRedirect == nil {
continue
}
availableUKIndexList = append(availableUKIndexList, indexRedirect)
if idx.Primary {
indexCache = indexRedirect
absoluteUKIndexInfo = indexRedirect
absoluteUKPosition = i
hasPk = true
} else if idx.Unique {
} else {
// second check not null unique key
if !hasPk && isSpecifiedIndexColumn(idx, fn) {
indexCache = indexRedirect
} else {
availableUKCache = append(availableUKCache, indexRedirect)
if absoluteUKIndexInfo == nil && isSpecifiedIndexColumn(idx, fn) {
absoluteUKIndexInfo = indexRedirect
absoluteUKPosition = i
}
}
}
Expand All @@ -526,14 +524,21 @@ func getDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *downstream
if !hasPk {
exPk := redirectIndexKeys(handlePkExCase(ti), originTi)
if exPk != nil {
indexCache = exPk
absoluteUKIndexInfo = exPk
absoluteUKPosition = len(availableUKIndexList)
availableUKIndexList = append(availableUKIndexList, absoluteUKIndexInfo)
}
}

return &downstreamTableInfo{
tableInfo: ti,
indexCache: indexCache,
availableUKCache: availableUKCache,
// move absoluteUKIndexInfo to the first in availableUKIndexList
if absoluteUKPosition != -1 && len(availableUKIndexList) > 1 {
availableUKIndexList[0], availableUKIndexList[absoluteUKPosition] = availableUKIndexList[absoluteUKPosition], availableUKIndexList[0]
}

return &DownstreamTableInfo{
TableInfo: ti,
AbsoluteUKIndexInfo: absoluteUKIndexInfo,
AvailableUKIndexList: availableUKIndexList,
}
}

Expand Down
Loading

0 comments on commit b4c6b17

Please sign in to comment.