diff --git a/drainer/schema.go b/drainer/schema.go index 09a985ea5..a8b18c795 100644 --- a/drainer/schema.go +++ b/drainer/schema.go @@ -21,6 +21,8 @@ type Schema struct { schemas map[int64]*model.DBInfo tables map[int64]*model.TableInfo + truncateTableID map[int64]struct{} + schemaMetaVersion int64 hasImplicitCol bool @@ -41,6 +43,7 @@ func NewSchema(jobs []*model.Job, hasImplicitCol bool) (*Schema, error) { s := &Schema{ hasImplicitCol: hasImplicitCol, version2SchemaTable: make(map[int64]TableName), + truncateTableID: make(map[int64]struct{}), jobs: jobs, } @@ -253,12 +256,12 @@ func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error { // the second value[string]: the table name // the third value[string]: the sql that is corresponding to the job // the fourth value[error]: the handleDDL execution's err -func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { +func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) { if skipJob(job) { return "", "", "", nil } - sql := job.Query + sql = job.Query if sql == "" { return "", "", "", errors.Errorf("[ddl job sql miss]%+v", job) } @@ -275,17 +278,16 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, ""} s.currentVersion = job.BinlogInfo.SchemaVersion - return schema.Name.O, "", sql, nil + schemaName = schema.Name.O case model.ActionDropSchema: - schemaName, err := s.DropSchema(job.SchemaID) + schemaName, err = s.DropSchema(job.SchemaID) if err != nil { return "", "", "", errors.Trace(err) } s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schemaName, ""} s.currentVersion = job.BinlogInfo.SchemaVersion - return schemaName, "", sql, nil case model.ActionRenameTable: // ignore schema doesn't support reanme ddl @@ -312,7 +314,8 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O} s.currentVersion = job.BinlogInfo.SchemaVersion - return schema.Name.O, table.Name.O, sql, nil + schemaName = schema.Name.O + tableName = table.Name.O case model.ActionCreateTable: table := job.BinlogInfo.TableInfo @@ -332,7 +335,8 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O} s.currentVersion = job.BinlogInfo.SchemaVersion - return schema.Name.O, table.Name.O, sql, nil + schemaName = schema.Name.O + tableName = table.Name.O case model.ActionDropTable: schema, ok := s.SchemaByID(job.SchemaID) @@ -347,7 +351,8 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, tableName} s.currentVersion = job.BinlogInfo.SchemaVersion - return schema.Name.O, tableName, sql, nil + schemaName = schema.Name.O + tableName = tableName case model.ActionTruncateTable: schema, ok := s.SchemaByID(job.SchemaID) @@ -355,6 +360,7 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { return "", "", "", errors.NotFoundf("schema %d", job.SchemaID) } + // job.TableID is the old table id, different from table.ID _, err := s.DropTable(job.TableID) if err != nil { return "", "", "", errors.Trace(err) @@ -372,7 +378,9 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O} s.currentVersion = job.BinlogInfo.SchemaVersion - return schema.Name.O, table.Name.O, sql, nil + schemaName = schema.Name.O + tableName = table.Name.O + s.truncateTableID[job.TableID] = struct{}{} default: binlogInfo := job.BinlogInfo @@ -396,8 +404,17 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) { s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, tbInfo.Name.O} s.currentVersion = job.BinlogInfo.SchemaVersion - return schema.Name.O, tbInfo.Name.O, sql, nil + schemaName = schema.Name.O + tableName = tbInfo.Name.O } + + return +} + +// IsTruncateTableID return if the table id is truncate by truncate table DDL +func (s *Schema) IsTruncateTableID(id int64) bool { + _, ok := s.truncateTableID[id] + return ok } func (s *Schema) getSchemaTableAndDelete(version int64) (string, string, error) { @@ -425,11 +442,9 @@ func addImplicitColumn(table *model.TableInfo) { table.Indices = []*model.IndexInfo{newIndex} } -// there's only two status will be in HistoryDDLJob(we fetch at start time): -// JobStateSynced and JobStateRollbackDone -// If it fail to commit(to tikv) in 2pc phrase (when changing JobStateDone -> JobStateSynced and add to HistoryDDLJob), -// then is would't not be add to HistoryDDLJob, and we may get (prewrite + rollback binlog), -// this binlog event would reach drainer, finally we will get a (p + commit binlog) when tidb retry and successfully commit +// TiDB write DDL Binlog for every DDL Job, we must ignore jobs that are cancelled or rollback +// For older version TiDB, it write DDL Binlog in the txn that the state of job is changed to *synced* +// Now, it write DDL Binlog in the txn that the state of job is changed to *done* (before change to *synced*) func skipJob(job *model.Job) bool { - return !job.IsSynced() + return !job.IsSynced() && !job.IsDone() } diff --git a/drainer/syncer.go b/drainer/syncer.go index 62490c1f6..a0b2f415a 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -48,9 +48,6 @@ type Syncer struct { positions map[string]int64 initCommitTS int64 - // because TiDB is case-insensitive, only lower-case here. - ignoreSchemaNames map[string]struct{} - ctx context.Context cancel context.CancelFunc @@ -418,6 +415,9 @@ func (s *Syncer) run(jobs []*model.Job) error { } } s.schema, err = NewSchema(jobs, false) + if err != nil { + return errors.Trace(err) + } s.executors, err = createExecutors(s.cfg.DestDBType, s.cfg.To, s.cfg.WorkerCount) if err != nil { @@ -463,6 +463,11 @@ func (s *Syncer) run(jobs []*model.Job) error { return errors.Errorf("prewrite %s unmarshal error %v", preWriteValue, err) } + err = s.rewriteForOldVersion(preWrite) + if err != nil { + return errors.Annotate(err, "rewrite for old version fail") + } + log.Debug("DML SchemaVersion: ", preWrite.SchemaVersion) err = s.schema.handlePreviousDDLJobIfNeed(preWrite.SchemaVersion) if err != nil { @@ -645,3 +650,19 @@ func (s *Syncer) GetLastSyncTime() time.Time { func (s *Syncer) GetLatestCommitTS() int64 { return s.cp.TS() } + +// https://github.com/pingcap/tidb/issues/9304 +func (s *Syncer) rewriteForOldVersion(pv *pb.PrewriteValue) (err error) { + var mutations []pb.TableMutation + for _, mutation := range pv.GetMutations() { + if s.schema.IsTruncateTableID(mutation.TableId) { + log.Infof("skip old version truncate dml, table id: %d", mutation.TableId) + continue + } + + mutations = append(mutations, mutation) + } + pv.Mutations = mutations + + return nil +} diff --git a/tests/run.sh b/tests/run.sh index 855aeafa8..6c172ef09 100755 --- a/tests/run.sh +++ b/tests/run.sh @@ -31,6 +31,29 @@ stop_services() { killall -9 drainer || true } +start_upstream_tidb() { + port=${1-4000} + echo "Starting TiDB at port: $port..." + tidb-server \ + -P $port \ + --store tikv \ + --path 127.0.0.1:2379 \ + --enable-binlog=true \ + --log-file "$OUT_DIR/tidb.log" & + + echo "Verifying TiDB is started..." + i=0 + while ! mysql -uroot -h127.0.0.1 -P$port --default-character-set utf8 -e 'select * from mysql.tidb;'; do + i=$((i+1)) + if [ "$i" -gt 40 ]; then + echo 'Failed to start TiDB' + exit 1 + fi + sleep 3 + done + +} + start_services() { stop_services clean_data @@ -71,25 +94,7 @@ EOF sleep 5 - - echo "Starting TiDB..." - tidb-server \ - -P 4000 \ - --store tikv \ - --path 127.0.0.1:2379 \ - --enable-binlog=true \ - --log-file "$OUT_DIR/tidb.log" & - - echo "Verifying TiDB is started..." - i=0 - while ! mysql -uroot -h127.0.0.1 -P4000 --default-character-set utf8 -e 'select * from mysql.tidb;'; do - i=$((i+1)) - if [ "$i" -gt 40 ]; then - echo 'Failed to start TiDB' - exit 1 - fi - sleep 3 - done + start_upstream_tidb 4000 echo "Starting Downstream TiDB..." tidb-server \