Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: use downstream PK/UK to generate DML #2163

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8f7a8cf
commit-message: update the schema tracker core code about #1895
WizardXiao Sep 23, 2021
8a0ad43
save
WizardXiao Sep 23, 2021
0acd3d5
commit-message: merge the update of schema tracker
WizardXiao Sep 23, 2021
c28676c
commit-message: change track downstream tables by use create table stmt
WizardXiao Sep 26, 2021
246c887
commit-message: change track downstream tables by use create table stmt
WizardXiao Sep 26, 2021
0376276
commit-message: change track downstream scheam info by TableInfo
WizardXiao Sep 29, 2021
823c0d1
commit-message: use TableInfo and index cache to track downstream schema
WizardXiao Sep 29, 2021
47d10cb
commit-message: use TableInfo and index cache to track downstream schema
WizardXiao Sep 29, 2021
ae43898
commit-message: add unit test and integration test
WizardXiao Oct 12, 2021
d5a4ceb
Merge branch 'master' into schema_compatible
lance6716 Oct 13, 2021
499634f
Merge branch 'master' of https://github.com/pingcap/dm into schema_co…
WizardXiao Oct 13, 2021
982e5c3
commit-message: fix fmt and retest ut
WizardXiao Oct 13, 2021
e959afd
Merge branch 'schema_compatible' of https://github.com/WizardXiao/dm …
WizardXiao Oct 13, 2021
c77d91e
commit-message: update set downstream tracker sql mode by default value
WizardXiao Oct 19, 2021
27cbc87
Merge branch 'master' of https://github.com/pingcap/dm into schema_co…
WizardXiao Oct 19, 2021
8eb2546
Merge branch 'master' into schema_compatible
WizardXiao Oct 19, 2021
863fdb9
Merge branch 'schema_compatible' of https://github.com/WizardXiao/dm …
WizardXiao Oct 19, 2021
b0ccf61
Merge branch 'master' of https://github.com/pingcap/dm into schema_co…
WizardXiao Oct 21, 2021
08eca2a
commit-message: fix fail or panic in unit-test
WizardXiao Oct 21, 2021
816abf2
commit-message: update fmt and comment for const
WizardXiao Oct 21, 2021
e47faa2
commit-message: update downstream conn to dbconn
WizardXiao Oct 21, 2021
f6103ca
Merge branch 'master' of https://github.com/pingcap/dm into schema_co…
WizardXiao Oct 21, 2021
aeac93f
commit-message: add sql check by failpoint
WizardXiao Oct 26, 2021
eafa54a
commit-message: merge master
WizardXiao Oct 27, 2021
b13069b
commit-message: merge master
WizardXiao Oct 27, 2021
822e6c7
commit-message: add UT
WizardXiao Oct 27, 2021
783348b
Merge branch 'master' of https://github.com/pingcap/dm into schema_co…
WizardXiao Oct 27, 2021
d49f691
Merge branch 'master' into schema_compatible
WizardXiao Oct 27, 2021
8a529b1
Merge branch 'schema_compatible' of https://github.com/WizardXiao/dm …
WizardXiao Oct 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,10 @@ ErrSchemaTrackerInvalidCreateTableStmt,[code=44009:class=schema-tracker:scope=in
ErrSchemaTrackerRestoreStmtFail,[code=44010:class=schema-tracker:scope=internal:level=medium], "Message: fail to restore the statement"
ErrSchemaTrackerCannotDropTable,[code=44011:class=schema-tracker:scope=internal:level=high], "Message: failed to drop table for %v in schema tracker"
ErrSchemaTrackerInit,[code=44012:class=schema-tracker:scope=internal:level=high], "Message: failed to create schema tracker"
ErrSchemaTrackerCannotSetDownstreamSQLMode,[code=44013:class=schema-tracker:scope=internal:level=high], "Message: failed to set default downstream sql_mode %v in schema tracker"
ErrSchemaTrackerCannotInitDownstreamParser,[code=44014:class=schema-tracker:scope=internal:level=high], "Message: failed to init downstream parser by sql_mode %v in schema tracker"
ErrSchemaTrackerCannotMockDownstreamTable,[code=44015:class=schema-tracker:scope=internal:level=high], "Message: failed to mock downstream table by create table statement %v in schema tracker"
ErrSchemaTrackerCannotFetchDownstreamCreateTableStmt,[code=44016:class=schema-tracker:scope=internal:level=high], "Message: failed to fetch downstream table %v by show create table statement in schema tracker"
ErrSchedulerNotStarted,[code=46001:class=scheduler:scope=internal:level=high], "Message: the scheduler has not started"
ErrSchedulerStarted,[code=46002:class=scheduler:scope=internal:level=medium], "Message: the scheduler has already started"
ErrSchedulerWorkerExist,[code=46003:class=scheduler:scope=internal:level=medium], "Message: dm-worker with name %s already exists"
Expand Down
24 changes: 24 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2962,6 +2962,30 @@ description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-schema-tracker-44013]
message = "failed to set default downstream sql_mode %v in schema tracker"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-schema-tracker-44014]
message = "failed to init downstream parser by sql_mode %v in schema tracker"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-schema-tracker-44015]
message = "failed to mock downstream table by create table statement %v in schema tracker"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-schema-tracker-44016]
message = "failed to fetch downstream table %v by show create table statement in schema tracker"
description = ""
workaround = ""
tags = ["internal", "high"]

[error.DM-scheduler-46001]
message = "the scheduler has not started"
description = ""
Expand Down
278 changes: 268 additions & 10 deletions pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,30 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/mock"
"go.uber.org/zap"

"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
dterror "github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/syncer/dbconn"
)

const (
// TiDBClusteredIndex is the variable name for clustered index.
TiDBClusteredIndex = "tidb_enable_clustered_index"
// downstream mock table id, consists of serial numbers of letters.
mockTableID = 121402101900011104
)

var (
Expand All @@ -54,15 +62,30 @@ var (

// Tracker is used to track schema locally.
type Tracker struct {
store kv.Storage
dom *domain.Domain
se session.Session
store kv.Storage
dom *domain.Domain
se session.Session
dsTracker *downstreamTracker // downstream tracker tableid -> createTableStmt
}

// downstreamTracker tracks downstream schema.
type downstreamTracker struct {
downstreamConn *dbconn.DBConn // downstream connection
stmtParser *parser.Parser // statement parser
tableInfos map[string]*downstreamTableInfo // downstream table infos
}

// downstreamTableInfo contains tableinfo and index cache.
type downstreamTableInfo struct {
tableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
indexCache *model.IndexInfo // index cache include pk/uk(not null)
availableUKCache []*model.IndexInfo // index cache include uks(data not null)
}

// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
// some variable from downstream TiDB using `tidbConn`.
// some variable from downstream using `downstreamConn`.
// NOTE **sessionCfg is a reference to caller**.
func NewTracker(ctx context.Context, task string, sessionCfg map[string]string, tidbConn *conn.BaseConn) (*Tracker, error) {
func NewTracker(ctx context.Context, task string, sessionCfg map[string]string, downstreamConn *dbconn.DBConn) (*Tracker, error) {
// NOTE: tidb uses a **global** config so can't isolate tracker's config from each other. If that isolation is needed,
// we might SetGlobalConfig before every call to tracker, or use some patch like https://github.com/bouk/monkey
tidbConfig.UpdateGlobal(func(conf *tidbConfig.Config) {
Expand All @@ -81,7 +104,7 @@ func NewTracker(ctx context.Context, task string, sessionCfg map[string]string,
for _, k := range downstreamVars {
if _, ok := sessionCfg[k]; !ok {
var ignoredColumn interface{}
rows, err2 := tidbConn.QuerySQL(tctx, fmt.Sprintf("SHOW VARIABLES LIKE '%s'", k))
rows, err2 := downstreamConn.QuerySQL(tctx, fmt.Sprintf("SHOW VARIABLES LIKE '%s'", k))
if err2 != nil {
return nil, err2
}
Expand Down Expand Up @@ -155,10 +178,17 @@ func NewTracker(ctx context.Context, task string, sessionCfg map[string]string,
return nil, err
}

// init downstreamTracker
dsTracker := &downstreamTracker{
downstreamConn: downstreamConn,
tableInfos: make(map[string]*downstreamTableInfo),
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
}

return &Tracker{
store: store,
dom: dom,
se: se,
store: store,
dom: dom,
se: se,
dsTracker: dsTracker,
}, nil
}

Expand Down Expand Up @@ -329,3 +359,231 @@ func (tr *Tracker) CreateTableIfNotExists(table *filter.Table, ti *model.TableIn
func (tr *Tracker) GetSystemVar(name string) (string, bool) {
return tr.se.GetSessionVars().GetSystemVar(name)
}

// GetDownStreamIndexInfo gets downstream PK/UK(not null) Index.
// note. this function will init downstreamTrack's table info.
func (tr *Tracker) GetDownStreamIndexInfo(tctx *tcontext.Context, tableID string, originTi *model.TableInfo) (*model.IndexInfo, error) {
dti, ok := tr.dsTracker.tableInfos[tableID]
if !ok {
log.L().Info("Downstream schema tracker init. ", zap.String("tableID", tableID))
ti, err := tr.getTableInfoByCreateStmt(tctx, tableID)
if err != nil {
log.L().Error("Init dowstream schema info error. ", zap.String("tableID", tableID), zap.Error(err))
return nil, err
}

dti = getDownStreamTi(ti, originTi)
tr.dsTracker.tableInfos[tableID] = dti
}
return dti.indexCache, nil
}

// GetAvailableDownStreamUKIndexInfo gets available downstream UK whose data is not null.
// note. this function will not init downstreamTrack.
func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []interface{}) *model.IndexInfo {
dti, ok := tr.dsTracker.tableInfos[tableID]

if !ok || len(dti.availableUKCache) == 0 {
return nil
}
// func for check data is not null
fn := func(i int) bool {
return data[i] != nil
}

for i, uk := range dti.availableUKCache {
// check uk's column data is not null
if isSpecifiedIndexColumn(uk, fn) {
if i != 0 {
// exchange available uk to the first of the array to reduce judgements for next row
dti.availableUKCache[0], dti.availableUKCache[i] = dti.availableUKCache[i], dti.availableUKCache[0]
}
return uk
}
}
return nil
}

// RemoveDownstreamSchema just remove schema or table in downstreamTrack.
func (tr *Tracker) RemoveDownstreamSchema(targetTables []*filter.Table) {
if len(targetTables) == 0 {
return
}

for _, targetTable := range targetTables {
tableID := utils.GenTableID(targetTable)
_, ok := tr.dsTracker.tableInfos[tableID]
if !ok {
// handle just have schema
if targetTable.Schema != "" && targetTable.Name == "" {
for k := range tr.dsTracker.tableInfos {
if strings.HasPrefix(k, tableID+".") {
delete(tr.dsTracker.tableInfos, k)
log.L().Info("Remove downstream schema tracker", zap.String("tableID", tableID))
}
}
}
} else {
WizardXiao marked this conversation as resolved.
Show resolved Hide resolved
delete(tr.dsTracker.tableInfos, tableID)
log.L().Info("Remove downstream schema tracker", zap.String("tableID", tableID))
}
}
}

// getTableInfoByCreateStmt get downstream tableInfo by "SHOW CREATE TABLE" stmt.
func (tr *Tracker) getTableInfoByCreateStmt(tctx *tcontext.Context, tableID string) (*model.TableInfo, error) {
if tr.dsTracker.stmtParser == nil {
err := tr.initDownStreamSQLModeAndParser(tctx)
if err != nil {
return nil, err
WizardXiao marked this conversation as resolved.
Show resolved Hide resolved
}
}
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved

querySQL := fmt.Sprintf("SHOW CREATE TABLE %s", tableID)
rows, err := tr.dsTracker.downstreamConn.QuerySQL(tctx, querySQL)
if err != nil {
return nil, dterror.ErrSchemaTrackerCannotFetchDownstreamCreateTableStmt.Delegate(err, tableID)
}
defer rows.Close()
var tableName, createStr string
if rows.Next() {
if err = rows.Scan(&tableName, &createStr); err != nil {
return nil, dterror.DBErrorAdapt(rows.Err(), dterror.ErrDBDriverError)
}
}

log.L().Info("Show create table info", zap.String("tableID", tableID), zap.String("create string", createStr))
// parse create table stmt.
stmtNode, err := tr.dsTracker.stmtParser.ParseOneStmt(createStr, "", "")
if err != nil {
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
return nil, dterror.ErrSchemaTrackerInvalidCreateTableStmt.Delegate(err, createStr)
}

ti, err := ddl.MockTableInfo(mock.NewContext(), stmtNode.(*ast.CreateTableStmt), mockTableID)
if err != nil {
return nil, dterror.ErrSchemaTrackerCannotMockDownstreamTable.Delegate(err, createStr)
}
return ti, nil
}

// initDownStreamTrackerParser init downstream tracker parser by default sql_mode.
func (tr *Tracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error {
setSQLMode := fmt.Sprintf("SET SESSION SQL_MODE = '%s'", mysql.DefaultSQLMode)
_, err := tr.dsTracker.downstreamConn.ExecuteSQL(tctx, []string{setSQLMode})
if err != nil {
return dterror.ErrSchemaTrackerCannotSetDownstreamSQLMode.Delegate(err, mysql.DefaultSQLMode)
}
stmtParser, err := utils.GetParserFromSQLModeStr(mysql.DefaultSQLMode)
if err != nil {
return dterror.ErrSchemaTrackerCannotInitDownstreamParser.Delegate(err, mysql.DefaultSQLMode)
}
tr.dsTracker.stmtParser = stmtParser
return nil
}

// getDownStreamTi constructs downstreamTable index cache by tableinfo.
func getDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *downstreamTableInfo {
var (
indexCache *model.IndexInfo
availableUKCache = make([]*model.IndexInfo, 0, len(ti.Indices))
hasPk = false
)

// func for check not null constraint
fn := func(i int) bool {
return mysql.HasNotNullFlag(ti.Columns[i].Flag)
}

for _, idx := range ti.Indices {
indexRedict := redirectIndexKeys(idx, originTi)
if indexRedict == nil {
continue
}
if idx.Primary {
indexCache = indexRedict
hasPk = true
} else if idx.Unique {
// second check not null unique key
if !hasPk && isSpecifiedIndexColumn(idx, fn) {
indexCache = indexRedict
} else {
availableUKCache = append(availableUKCache, indexRedict)
}
}
}

// handle pk exceptional case.
// e.g. "create table t(a int primary key, b int)".
if !hasPk {
exPk := redirectIndexKeys(handlePkExCase(ti), originTi)
if exPk != nil {
indexCache = exPk
}
}

return &downstreamTableInfo{
tableInfo: ti,
indexCache: indexCache,
availableUKCache: availableUKCache,
}
}

// redirectIndexKeys redirect index's columns offset in origin tableinfo.
func redirectIndexKeys(index *model.IndexInfo, originTi *model.TableInfo) *model.IndexInfo {
if index == nil || originTi == nil {
return nil
}

columns := make([]*model.IndexColumn, 0, len(index.Columns))
for _, key := range index.Columns {
if originColumn := model.FindColumnInfo(originTi.Columns, key.Name.O); originColumn != nil {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
column := &model.IndexColumn{
Name: key.Name,
Offset: originColumn.Offset,
Length: key.Length,
}
columns = append(columns, column)
}
}
if len(columns) == len(index.Columns) {
return &model.IndexInfo{
Table: index.Table,
Unique: index.Unique,
Primary: index.Primary,
State: index.State,
Tp: index.Tp,
Columns: columns,
}
}
return nil
}

// handlePkExCase is handle pk exceptional case.
// e.g. "create table t(a int primary key, b int)".
func handlePkExCase(ti *model.TableInfo) *model.IndexInfo {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if pk := ti.GetPkColInfo(); pk != nil {
return &model.IndexInfo{
Table: ti.Name,
Unique: true,
Primary: true,
State: model.StatePublic,
Tp: model.IndexTypeBtree,
Columns: []*model.IndexColumn{{
Name: pk.Name,
Offset: pk.Offset,
Length: types.UnspecifiedLength,
}},
}
}
return nil
}

// isSpecifiedIndexColumn checks all of index's columns are matching 'fn'.
func isSpecifiedIndexColumn(index *model.IndexInfo, fn func(i int) bool) bool {
for _, col := range index.Columns {
if !fn(col.Offset) {
return false
}
}
return true
}
Loading