Skip to content

Commit

Permalink
Fix affect of online DDL (#466)
Browse files Browse the repository at this point in the history
see issue pingcap/tidb#9304

skip the truncated table DML data.
refactor some translator code in ./drainer
add some ddl test case
change tests/run.sh to be easy start multi instance TiDB for test
  • Loading branch information
july2993 committed Feb 27, 2019
1 parent 045d968 commit 86504cb
Show file tree
Hide file tree
Showing 16 changed files with 529 additions and 251 deletions.
49 changes: 32 additions & 17 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Schema struct {
schemas map[int64]*model.DBInfo
tables map[int64]*model.TableInfo

truncateTableID map[int64]struct{}

schemaMetaVersion int64

hasImplicitCol bool
Expand All @@ -41,6 +43,7 @@ func NewSchema(jobs []*model.Job, hasImplicitCol bool) (*Schema, error) {
s := &Schema{
hasImplicitCol: hasImplicitCol,
version2SchemaTable: make(map[int64]TableName),
truncateTableID: make(map[int64]struct{}),
jobs: jobs,
}

Expand Down Expand Up @@ -253,12 +256,12 @@ func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
// the second value[string]: the table name
// the third value[string]: the sql that is corresponding to the job
// the fourth value[error]: the handleDDL execution's err
func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {
func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) {
if skipJob(job) {
return "", "", "", nil
}

sql := job.Query
sql = job.Query
if sql == "" {
return "", "", "", errors.Errorf("[ddl job sql miss]%+v", job)
}
Expand All @@ -275,17 +278,16 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, ""}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, "", sql, nil
schemaName = schema.Name.O

case model.ActionDropSchema:
schemaName, err := s.DropSchema(job.SchemaID)
schemaName, err = s.DropSchema(job.SchemaID)
if err != nil {
return "", "", "", errors.Trace(err)
}

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schemaName, ""}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schemaName, "", sql, nil

case model.ActionRenameTable:
// ignore schema doesn't support reanme ddl
Expand All @@ -312,7 +314,8 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, table.Name.O, sql, nil
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionCreateTable:
table := job.BinlogInfo.TableInfo
Expand All @@ -332,29 +335,31 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, table.Name.O, sql, nil
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionDropTable:
schema, ok := s.SchemaByID(job.SchemaID)
if !ok {
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
}

tableName, err := s.DropTable(job.TableID)
tableName, err = s.DropTable(job.TableID)
if err != nil {
return "", "", "", errors.Trace(err)
}

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, tableName}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, tableName, sql, nil
schemaName = schema.Name.O

case model.ActionTruncateTable:
schema, ok := s.SchemaByID(job.SchemaID)
if !ok {
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
}

// job.TableID is the old table id, different from table.ID
_, err := s.DropTable(job.TableID)
if err != nil {
return "", "", "", errors.Trace(err)
Expand All @@ -372,7 +377,9 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, table.Name.O, sql, nil
schemaName = schema.Name.O
tableName = table.Name.O
s.truncateTableID[job.TableID] = struct{}{}

default:
binlogInfo := job.BinlogInfo
Expand All @@ -396,8 +403,17 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, tbInfo.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, tbInfo.Name.O, sql, nil
schemaName = schema.Name.O
tableName = tbInfo.Name.O
}

return
}

// IsTruncateTableID returns true if the table id have been truncated by truncate table DDL
func (s *Schema) IsTruncateTableID(id int64) bool {
_, ok := s.truncateTableID[id]
return ok
}

func (s *Schema) getSchemaTableAndDelete(version int64) (string, string, error) {
Expand Down Expand Up @@ -425,11 +441,10 @@ func addImplicitColumn(table *model.TableInfo) {
table.Indices = []*model.IndexInfo{newIndex}
}

// there's only two status will be in HistoryDDLJob(we fetch at start time):
// JobStateSynced and JobStateRollbackDone
// If it fail to commit(to tikv) in 2pc phrase (when changing JobStateDone -> JobStateSynced and add to HistoryDDLJob),
// then is would't not be add to HistoryDDLJob, and we may get (prewrite + rollback binlog),
// this binlog event would reach drainer, finally we will get a (p + commit binlog) when tidb retry and successfully commit
// TiDB write DDL Binlog for every DDL Job, we must ignore jobs that are cancelled or rollback
// For older version TiDB, it write DDL Binlog in the txn that the state of job is changed to *synced*
// Now, it write DDL Binlog in the txn that the state of job is changed to *done* (before change to *synced*)
// At state *done*, it will be always and only changed to *synced*.
func skipJob(job *model.Job) bool {
return !job.IsSynced()
return !job.IsSynced() && !job.IsDone()
}
38 changes: 35 additions & 3 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ type Syncer struct {
positions map[string]int64
initCommitTS int64

// because TiDB is case-insensitive, only lower-case here.
ignoreSchemaNames map[string]struct{}

ctx context.Context
cancel context.CancelFunc

Expand Down Expand Up @@ -418,6 +415,9 @@ func (s *Syncer) run(jobs []*model.Job) error {
}
}
s.schema, err = NewSchema(jobs, false)
if err != nil {
return errors.Trace(err)
}

s.executors, err = createExecutors(s.cfg.DestDBType, s.cfg.To, s.cfg.WorkerCount)
if err != nil {
Expand All @@ -436,6 +436,7 @@ func (s *Syncer) run(jobs []*model.Job) error {
go s.sync(s.executors[i], s.jobCh[i], i)
}

var lastDDLSchemaVersion int64
var b *binlogItem
for {
select {
Expand Down Expand Up @@ -463,7 +464,16 @@ func (s *Syncer) run(jobs []*model.Job) error {
return errors.Errorf("prewrite %s unmarshal error %v", preWriteValue, err)
}

err = s.rewriteForOldVersion(preWrite)
if err != nil {
return errors.Annotate(err, "rewrite for old version fail")
}

log.Debug("DML SchemaVersion: ", preWrite.SchemaVersion)
if preWrite.SchemaVersion < lastDDLSchemaVersion {
log.Debug("encounter older schema dml")
}

err = s.schema.handlePreviousDDLJobIfNeed(preWrite.SchemaVersion)
if err != nil {
return errors.Trace(err)
Expand All @@ -481,6 +491,8 @@ func (s *Syncer) run(jobs []*model.Job) error {
s.schema.addJob(b.job)

log.Debug("DDL SchemaVersion: ", b.job.BinlogInfo.SchemaVersion)
lastDDLSchemaVersion = b.job.BinlogInfo.SchemaVersion

err = s.schema.handlePreviousDDLJobIfNeed(b.job.BinlogInfo.SchemaVersion)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -645,3 +657,23 @@ func (s *Syncer) GetLastSyncTime() time.Time {
func (s *Syncer) GetLatestCommitTS() int64 {
return s.cp.TS()
}

// see https://github.com/pingcap/tidb/issues/9304
// currently, we only drop the data which table id is truncated.
// because of online DDL, different TiDB instance may see the different schema,
// it can't be treated simply as one timeline consider both DML and DDL,
// we must carefully handle every DDL type now and need to find a better design.
func (s *Syncer) rewriteForOldVersion(pv *pb.PrewriteValue) (err error) {
var mutations = make([]pb.TableMutation, 0, len(pv.GetMutations()))
for _, mutation := range pv.GetMutations() {
if s.schema.IsTruncateTableID(mutation.TableId) {
log.Infof("skip old version truncate dml, table id: %d", mutation.TableId)
continue
}

mutations = append(mutations, mutation)
}
pv.Mutations = mutations

return nil
}
4 changes: 2 additions & 2 deletions drainer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (t *testDrainerSuite) TestHandleDDL(c *C) {
c.Assert(sql, Equals, "")

// check job.Query is empty
job = &model.Job{ID: 1, State: model.JobStateSynced}
job = &model.Job{ID: 1, State: model.JobStateDone}
_, _, sql, err = s.schema.handleDDL(job)
c.Assert(sql, Equals, "")
c.Assert(err, NotNil, Commentf("should return not found job.Query"))
Expand Down Expand Up @@ -82,7 +82,7 @@ func (t *testDrainerSuite) TestHandleDDL(c *C) {

job = &model.Job{
ID: testCase.jobID,
State: model.JobStateSynced,
State: model.JobStateDone,
SchemaID: testCase.schemaID,
TableID: testCase.tableID,
Type: testCase.jobType,
Expand Down
26 changes: 3 additions & 23 deletions drainer/translator/flash.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
)

// flashTranslator translates TiDB binlog to flash sqls
Expand All @@ -42,45 +41,26 @@ func (f *flashTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r
version := makeInternalVersionValue(uint64(commitTS))
delFlag := makeInternalDelmarkValue(false)

colsTypeMap := toFlashColumnTypeMap(columns)
columnList := genColumnList(columns)
// addition 2 holder is for del flag and version
columnPlaceholders := dml.GenColumnPlaceholders(len(columns) + 2)
sql := fmt.Sprintf("IMPORT INTO `%s`.`%s` (%s) values (%s);", schema, table.Name.L, columnList, columnPlaceholders)

for _, row := range rows {
//decode the pk value
remain, pk, err := codec.DecodeOne(row)
hashKey := pk.GetInt64()
pk, columnValues, err := insertRowToDatums(table, row)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}

columnValues, err := tablecodec.DecodeRow(remain, colsTypeMap, gotime.Local)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}

if columnValues == nil {
columnValues = make(map[int64]types.Datum)
}
hashKey := pk.GetInt64()

var vals []interface{}
vals = append(vals, hashKey)
for _, col := range columns {
if IsPKHandleColumn(table, col) {
columnValues[col.ID] = pk
pkVal, err := formatFlashData(&pk, &col.FieldType)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
vals = append(vals, pkVal)
continue
}

val, ok := columnValues[col.ID]
if !ok {
vals = append(vals, col.DefaultValue)
vals = append(vals, col.GetDefaultValue())
} else {
value, err := formatFlashData(&val, &col.FieldType)
if err != nil {
Expand Down
9 changes: 0 additions & 9 deletions drainer/translator/flash_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,6 @@ func isHandleTypeColumn(colDef *ast.ColumnDef) bool {
tp == mysql.TypeLonglong
}

func toFlashColumnTypeMap(columns []*model.ColumnInfo) map[int64]*types.FieldType {
colTypeMap := make(map[int64]*types.FieldType)
for _, col := range columns {
colTypeMap[col.ID] = &col.FieldType
}

return colTypeMap
}

func makeRow(pk int64, values []interface{}, version uint64, delFlag uint8) []interface{} {
var row []interface{}
row = append(row, pk)
Expand Down
Loading

0 comments on commit 86504cb

Please sign in to comment.