Skip to content

Commit

Permalink
Fix affect of online DDL
Browse files Browse the repository at this point in the history
  • Loading branch information
july2993 committed Feb 15, 2019
1 parent 535e6e2 commit 4a87b16
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 38 deletions.
47 changes: 31 additions & 16 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Schema struct {
schemas map[int64]*model.DBInfo
tables map[int64]*model.TableInfo

truncateTableID map[int64]struct{}

schemaMetaVersion int64

hasImplicitCol bool
Expand All @@ -41,6 +43,7 @@ func NewSchema(jobs []*model.Job, hasImplicitCol bool) (*Schema, error) {
s := &Schema{
hasImplicitCol: hasImplicitCol,
version2SchemaTable: make(map[int64]TableName),
truncateTableID: make(map[int64]struct{}),
jobs: jobs,
}

Expand Down Expand Up @@ -253,12 +256,12 @@ func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
// the second value[string]: the table name
// the third value[string]: the sql that is corresponding to the job
// the fourth value[error]: the handleDDL execution's err
func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {
func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) {
if skipJob(job) {
return "", "", "", nil
}

sql := job.Query
sql = job.Query
if sql == "" {
return "", "", "", errors.Errorf("[ddl job sql miss]%+v", job)
}
Expand All @@ -275,17 +278,16 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, ""}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, "", sql, nil
schemaName = schema.Name.O

case model.ActionDropSchema:
schemaName, err := s.DropSchema(job.SchemaID)
schemaName, err = s.DropSchema(job.SchemaID)
if err != nil {
return "", "", "", errors.Trace(err)
}

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schemaName, ""}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schemaName, "", sql, nil

case model.ActionRenameTable:
// ignore schema doesn't support reanme ddl
Expand All @@ -312,7 +314,8 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, table.Name.O, sql, nil
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionCreateTable:
table := job.BinlogInfo.TableInfo
Expand All @@ -332,7 +335,8 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, table.Name.O, sql, nil
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionDropTable:
schema, ok := s.SchemaByID(job.SchemaID)
Expand All @@ -347,14 +351,16 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, tableName}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, tableName, sql, nil
schemaName = schema.Name.O
tableName = tableName

case model.ActionTruncateTable:
schema, ok := s.SchemaByID(job.SchemaID)
if !ok {
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
}

// job.TableID is the old table id, different from table.ID
_, err := s.DropTable(job.TableID)
if err != nil {
return "", "", "", errors.Trace(err)
Expand All @@ -372,7 +378,9 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, table.Name.O, sql, nil
schemaName = schema.Name.O
tableName = table.Name.O
s.truncateTableID[job.TableID] = struct{}{}

default:
binlogInfo := job.BinlogInfo
Expand All @@ -396,8 +404,17 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, tbInfo.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, tbInfo.Name.O, sql, nil
schemaName = schema.Name.O
tableName = tbInfo.Name.O
}

return
}

// IsTruncateTableID return if the table id is truncate by truncate table DDL
func (s *Schema) IsTruncateTableID(id int64) bool {
_, ok := s.truncateTableID[id]
return ok
}

func (s *Schema) getSchemaTableAndDelete(version int64) (string, string, error) {
Expand Down Expand Up @@ -425,11 +442,9 @@ func addImplicitColumn(table *model.TableInfo) {
table.Indices = []*model.IndexInfo{newIndex}
}

// there's only two status will be in HistoryDDLJob(we fetch at start time):
// JobStateSynced and JobStateRollbackDone
// If it fail to commit(to tikv) in 2pc phrase (when changing JobStateDone -> JobStateSynced and add to HistoryDDLJob),
// then is would't not be add to HistoryDDLJob, and we may get (prewrite + rollback binlog),
// this binlog event would reach drainer, finally we will get a (p + commit binlog) when tidb retry and successfully commit
// TiDB write DDL Binlog for every DDL Job, we must ignore jobs that are cancelled or rollback
// For older version TiDB, it write DDL Binlog in the txn that the state of job is changed to *synced*
// Now, it write DDL Binlog in the txn that the state of job is changed to *done* (before change to *synced*)
func skipJob(job *model.Job) bool {
return !job.IsSynced()
return !job.IsSynced() && !job.IsDone()
}
27 changes: 24 additions & 3 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ type Syncer struct {
positions map[string]int64
initCommitTS int64

// because TiDB is case-insensitive, only lower-case here.
ignoreSchemaNames map[string]struct{}

ctx context.Context
cancel context.CancelFunc

Expand Down Expand Up @@ -418,6 +415,9 @@ func (s *Syncer) run(jobs []*model.Job) error {
}
}
s.schema, err = NewSchema(jobs, false)
if err != nil {
return errors.Trace(err)
}

s.executors, err = createExecutors(s.cfg.DestDBType, s.cfg.To, s.cfg.WorkerCount)
if err != nil {
Expand Down Expand Up @@ -463,6 +463,11 @@ func (s *Syncer) run(jobs []*model.Job) error {
return errors.Errorf("prewrite %s unmarshal error %v", preWriteValue, err)
}

err = s.rewriteForOldVersion(preWrite)
if err != nil {
return errors.Annotate(err, "rewrite for old version fail")
}

log.Debug("DML SchemaVersion: ", preWrite.SchemaVersion)
err = s.schema.handlePreviousDDLJobIfNeed(preWrite.SchemaVersion)
if err != nil {
Expand Down Expand Up @@ -645,3 +650,19 @@ func (s *Syncer) GetLastSyncTime() time.Time {
func (s *Syncer) GetLatestCommitTS() int64 {
return s.cp.TS()
}

// https://github.com/pingcap/tidb/issues/9304
func (s *Syncer) rewriteForOldVersion(pv *pb.PrewriteValue) (err error) {
var mutations []pb.TableMutation
for _, mutation := range pv.GetMutations() {
if s.schema.IsTruncateTableID(mutation.TableId) {
log.Infof("skip old version truncate dml, table id: %d", mutation.TableId)
continue
}

mutations = append(mutations, mutation)
}
pv.Mutations = mutations

return nil
}
43 changes: 24 additions & 19 deletions tests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,29 @@ stop_services() {
killall -9 drainer || true
}

start_upstream_tidb() {
port=${1-4000}
echo "Starting TiDB at port: $port..."
tidb-server \
-P $port \
--store tikv \
--path 127.0.0.1:2379 \
--enable-binlog=true \
--log-file "$OUT_DIR/tidb.log" &

echo "Verifying TiDB is started..."
i=0
while ! mysql -uroot -h127.0.0.1 -P$port --default-character-set utf8 -e 'select * from mysql.tidb;'; do
i=$((i+1))
if [ "$i" -gt 40 ]; then
echo 'Failed to start TiDB'
exit 1
fi
sleep 3
done

}

start_services() {
stop_services
clean_data
Expand Down Expand Up @@ -71,25 +94,7 @@ EOF

sleep 5


echo "Starting TiDB..."
tidb-server \
-P 4000 \
--store tikv \
--path 127.0.0.1:2379 \
--enable-binlog=true \
--log-file "$OUT_DIR/tidb.log" &

echo "Verifying TiDB is started..."
i=0
while ! mysql -uroot -h127.0.0.1 -P4000 --default-character-set utf8 -e 'select * from mysql.tidb;'; do
i=$((i+1))
if [ "$i" -gt 40 ]; then
echo 'Failed to start TiDB'
exit 1
fi
sleep 3
done
start_upstream_tidb 4000

echo "Starting Downstream TiDB..."
tidb-server \
Expand Down

0 comments on commit 4a87b16

Please sign in to comment.