Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm/syncer: multiple rows use downstream schema #3308

Merged
merged 10 commits into from
Nov 9, 2021
Merged
73 changes: 39 additions & 34 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ type Tracker struct {
type downstreamTracker struct {
downstreamConn *dbconn.DBConn // downstream connection
stmtParser *parser.Parser // statement parser
tableInfos map[string]*downstreamTableInfo // downstream table infos
tableInfos map[string]*DownstreamTableInfo // downstream table infos
}

// downstreamTableInfo contains tableinfo and index cache.
type downstreamTableInfo struct {
tableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
indexCache *model.IndexInfo // index cache include pk/uk(not null)
availableUKCache []*model.IndexInfo // index cache include uks(data not null)
// DownstreamTableInfo contains tableinfo and index cache.
type DownstreamTableInfo struct {
TableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
AbsoluteUKIndexInfo *model.IndexInfo // absolute uk index is a pk/uk(not null)
AvailableUKIndexList []*model.IndexInfo // index list which is all uks
}

// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
Expand Down Expand Up @@ -181,7 +181,7 @@ func NewTracker(ctx context.Context, task string, sessionCfg map[string]string,
// init downstreamTracker
dsTracker := &downstreamTracker{
downstreamConn: downstreamConn,
tableInfos: make(map[string]*downstreamTableInfo),
tableInfos: make(map[string]*DownstreamTableInfo),
}

return &Tracker{
Expand Down Expand Up @@ -375,9 +375,9 @@ func (tr *Tracker) GetSystemVar(name string) (string, bool) {
return tr.se.GetSessionVars().GetSystemVar(name)
}

// GetDownStreamIndexInfo gets downstream PK/UK(not null) Index.
// GetDownStreamTableInfo gets downstream table info.
// note. this function will init downstreamTrack's table info.
func (tr *Tracker) GetDownStreamIndexInfo(tctx *tcontext.Context, tableID string, originTi *model.TableInfo) (*model.IndexInfo, error) {
func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string, originTi *model.TableInfo) (*DownstreamTableInfo, error) {
dti, ok := tr.dsTracker.tableInfos[tableID]
if !ok {
tctx.Logger.Info("Downstream schema tracker init. ", zap.String("tableID", tableID))
Expand All @@ -387,32 +387,28 @@ func (tr *Tracker) GetDownStreamIndexInfo(tctx *tcontext.Context, tableID string
return nil, err
}

dti = getDownStreamTi(ti, originTi)
dti = GetDownStreamTi(ti, originTi)
tr.dsTracker.tableInfos[tableID] = dti
}
return dti.indexCache, nil
return dti, nil
}

// GetAvailableDownStreamUKIndexInfo gets available downstream UK whose data is not null.
// note. this function will not init downstreamTrack.
func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []interface{}) *model.IndexInfo {
dti, ok := tr.dsTracker.tableInfos[tableID]

if !ok || len(dti.availableUKCache) == 0 {
if !ok || len(dti.AvailableUKIndexList) == 0 {
return nil
}
// func for check data is not null
fn := func(i int) bool {
return data[i] != nil
}

for i, uk := range dti.availableUKCache {
for _, uk := range dti.AvailableUKIndexList {
// check uk's column data is not null
if isSpecifiedIndexColumn(uk, fn) {
if i != 0 {
// exchange available uk to the first of the array to reduce judgements for next row
dti.availableUKCache[0], dti.availableUKCache[i] = dti.availableUKCache[i], dti.availableUKCache[0]
}
return uk
}
}
Expand Down Expand Up @@ -487,36 +483,38 @@ func (tr *Tracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error
return nil
}

// getDownStreamTi constructs downstreamTable index cache by tableinfo.
func getDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *downstreamTableInfo {
// GetDownStreamTi constructs downstreamTable index cache by tableinfo.
func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo {
var (
indexCache *model.IndexInfo
availableUKCache = make([]*model.IndexInfo, 0, len(ti.Indices))
hasPk = false
absoluteUKIndexInfo *model.IndexInfo
availableUKIndexList = []*model.IndexInfo{}
hasPk = false
absoluteUKPosition = -1
)

// func for check not null constraint
fn := func(i int) bool {
return mysql.HasNotNullFlag(ti.Columns[i].Flag)
}

for _, idx := range ti.Indices {
for i, idx := range ti.Indices {
if !idx.Primary && !idx.Unique {
continue
}
indexRedirect := redirectIndexKeys(idx, originTi)
if indexRedirect == nil {
continue
}
availableUKIndexList = append(availableUKIndexList, indexRedirect)
if idx.Primary {
indexCache = indexRedirect
absoluteUKIndexInfo = indexRedirect
absoluteUKPosition = i
hasPk = true
} else if idx.Unique {
} else {
// second check not null unique key
if !hasPk && isSpecifiedIndexColumn(idx, fn) {
indexCache = indexRedirect
} else {
availableUKCache = append(availableUKCache, indexRedirect)
if absoluteUKIndexInfo == nil && isSpecifiedIndexColumn(idx, fn) {
absoluteUKIndexInfo = indexRedirect
absoluteUKPosition = i
}
}
}
Expand All @@ -526,14 +524,21 @@ func getDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *downstream
if !hasPk {
exPk := redirectIndexKeys(handlePkExCase(ti), originTi)
if exPk != nil {
indexCache = exPk
absoluteUKIndexInfo = exPk
absoluteUKPosition = len(availableUKIndexList)
availableUKIndexList = append(availableUKIndexList, absoluteUKIndexInfo)
}
}

return &downstreamTableInfo{
tableInfo: ti,
indexCache: indexCache,
availableUKCache: availableUKCache,
// move absoluteUKIndexInfo to the first in availableUKIndexList
if absoluteUKPosition != -1 && len(availableUKIndexList) > 1 {
availableUKIndexList[0], availableUKIndexList[absoluteUKPosition] = availableUKIndexList[absoluteUKPosition], availableUKIndexList[0]
}

return &DownstreamTableInfo{
TableInfo: ti,
AbsoluteUKIndexInfo: absoluteUKIndexInfo,
AvailableUKIndexList: availableUKIndexList,
}
}

Expand Down
Loading