Skip to content

Commit

Permalink
Fix affect of online DDL
Browse files Browse the repository at this point in the history
see issue pingcap/tidb#9304
we drop the truncated table dml data.
  • Loading branch information
july2993 committed Feb 24, 2019
1 parent 4de3fff commit 6b29d3b
Show file tree
Hide file tree
Showing 13 changed files with 409 additions and 101 deletions.
49 changes: 32 additions & 17 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Schema struct {
schemas map[int64]*model.DBInfo
tables map[int64]*model.TableInfo

truncateTableID map[int64]struct{}

schemaMetaVersion int64

hasImplicitCol bool
Expand All @@ -41,6 +43,7 @@ func NewSchema(jobs []*model.Job, hasImplicitCol bool) (*Schema, error) {
s := &Schema{
hasImplicitCol: hasImplicitCol,
version2SchemaTable: make(map[int64]TableName),
truncateTableID: make(map[int64]struct{}),
jobs: jobs,
}

Expand Down Expand Up @@ -253,12 +256,12 @@ func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
// the second value[string]: the table name
// the third value[string]: the sql that is corresponding to the job
// the fourth value[error]: the handleDDL execution's err
func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {
func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) {
if skipJob(job) {
return "", "", "", nil
}

sql := job.Query
sql = job.Query
if sql == "" {
return "", "", "", errors.Errorf("[ddl job sql miss]%+v", job)
}
Expand All @@ -275,17 +278,16 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, ""}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, "", sql, nil
schemaName = schema.Name.O

case model.ActionDropSchema:
schemaName, err := s.DropSchema(job.SchemaID)
schemaName, err = s.DropSchema(job.SchemaID)
if err != nil {
return "", "", "", errors.Trace(err)
}

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schemaName, ""}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schemaName, "", sql, nil

case model.ActionRenameTable:
// ignore schema doesn't support reanme ddl
Expand All @@ -312,7 +314,8 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, table.Name.O, sql, nil
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionCreateTable:
table := job.BinlogInfo.TableInfo
Expand All @@ -332,29 +335,31 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, table.Name.O, sql, nil
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionDropTable:
schema, ok := s.SchemaByID(job.SchemaID)
if !ok {
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
}

tableName, err := s.DropTable(job.TableID)
tableName, err = s.DropTable(job.TableID)
if err != nil {
return "", "", "", errors.Trace(err)
}

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, tableName}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, tableName, sql, nil
schemaName = schema.Name.O

case model.ActionTruncateTable:
schema, ok := s.SchemaByID(job.SchemaID)
if !ok {
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
}

// job.TableID is the old table id, different from table.ID
_, err := s.DropTable(job.TableID)
if err != nil {
return "", "", "", errors.Trace(err)
Expand All @@ -372,7 +377,9 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, table.Name.O, sql, nil
schemaName = schema.Name.O
tableName = table.Name.O
s.truncateTableID[job.TableID] = struct{}{}

default:
binlogInfo := job.BinlogInfo
Expand All @@ -396,8 +403,17 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, tbInfo.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, tbInfo.Name.O, sql, nil
schemaName = schema.Name.O
tableName = tbInfo.Name.O
}

return
}

// IsTruncateTableID return if the table id is truncate by truncate table DDL
func (s *Schema) IsTruncateTableID(id int64) bool {
_, ok := s.truncateTableID[id]
return ok
}

func (s *Schema) getSchemaTableAndDelete(version int64) (string, string, error) {
Expand Down Expand Up @@ -425,11 +441,10 @@ func addImplicitColumn(table *model.TableInfo) {
table.Indices = []*model.IndexInfo{newIndex}
}

// there's only two status will be in HistoryDDLJob(we fetch at start time):
// JobStateSynced and JobStateRollbackDone
// If it fail to commit(to tikv) in 2pc phrase (when changing JobStateDone -> JobStateSynced and add to HistoryDDLJob),
// then is would't not be add to HistoryDDLJob, and we may get (prewrite + rollback binlog),
// this binlog event would reach drainer, finally we will get a (p + commit binlog) when tidb retry and successfully commit
// TiDB write DDL Binlog for every DDL Job, we must ignore jobs that are cancelled or rollback
// For older version TiDB, it write DDL Binlog in the txn that the state of job is changed to *synced*
// Now, it write DDL Binlog in the txn that the state of job is changed to *done* (before change to *synced*)
// At state *done*, it will be always and only changed to *synced*.
func skipJob(job *model.Job) bool {
return !job.IsSynced()
return !job.IsSynced() && !job.IsDone()
}
35 changes: 32 additions & 3 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ type Syncer struct {
positions map[string]int64
initCommitTS int64

// because TiDB is case-insensitive, only lower-case here.
ignoreSchemaNames map[string]struct{}

ctx context.Context
cancel context.CancelFunc

Expand Down Expand Up @@ -418,6 +415,9 @@ func (s *Syncer) run(jobs []*model.Job) error {
}
}
s.schema, err = NewSchema(jobs, false)
if err != nil {
return errors.Trace(err)
}

s.executors, err = createExecutors(s.cfg.DestDBType, s.cfg.To, s.cfg.WorkerCount)
if err != nil {
Expand All @@ -436,6 +436,7 @@ func (s *Syncer) run(jobs []*model.Job) error {
go s.sync(s.executors[i], s.jobCh[i], i)
}

var lastDDLSchemaVersion int64
var b *binlogItem
for {
select {
Expand Down Expand Up @@ -463,7 +464,16 @@ func (s *Syncer) run(jobs []*model.Job) error {
return errors.Errorf("prewrite %s unmarshal error %v", preWriteValue, err)
}

err = s.rewriteForOldVersion(preWrite)
if err != nil {
return errors.Annotate(err, "rewrite for old version fail")
}

log.Debug("DML SchemaVersion: ", preWrite.SchemaVersion)
if preWrite.SchemaVersion < lastDDLSchemaVersion {
log.Debug("encounter older schema dml")
}

err = s.schema.handlePreviousDDLJobIfNeed(preWrite.SchemaVersion)
if err != nil {
return errors.Trace(err)
Expand All @@ -481,6 +491,8 @@ func (s *Syncer) run(jobs []*model.Job) error {
s.schema.addJob(b.job)

log.Debug("DDL SchemaVersion: ", b.job.BinlogInfo.SchemaVersion)
lastDDLSchemaVersion = b.job.BinlogInfo.SchemaVersion

err = s.schema.handlePreviousDDLJobIfNeed(b.job.BinlogInfo.SchemaVersion)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -645,3 +657,20 @@ func (s *Syncer) GetLastSyncTime() time.Time {
func (s *Syncer) GetLatestCommitTS() int64 {
return s.cp.TS()
}

// see https://github.com/pingcap/tidb/issues/9304
// currently, we only drop the data which table id is truncated.
func (s *Syncer) rewriteForOldVersion(pv *pb.PrewriteValue) (err error) {
var mutations []pb.TableMutation
for _, mutation := range pv.GetMutations() {
if s.schema.IsTruncateTableID(mutation.TableId) {
log.Infof("skip old version truncate dml, table id: %d", mutation.TableId)
continue
}

mutations = append(mutations, mutation)
}
pv.Mutations = mutations

return nil
}
4 changes: 2 additions & 2 deletions drainer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (t *testDrainerSuite) TestHandleDDL(c *C) {
c.Assert(sql, Equals, "")

// check job.Query is empty
job = &model.Job{ID: 1, State: model.JobStateSynced}
job = &model.Job{ID: 1, State: model.JobStateDone}
_, _, sql, err = s.schema.handleDDL(job)
c.Assert(sql, Equals, "")
c.Assert(err, NotNil, Commentf("should return not found job.Query"))
Expand Down Expand Up @@ -82,7 +82,7 @@ func (t *testDrainerSuite) TestHandleDDL(c *C) {

job = &model.Job{
ID: testCase.jobID,
State: model.JobStateSynced,
State: model.JobStateDone,
SchemaID: testCase.schemaID,
TableID: testCase.tableID,
Type: testCase.jobType,
Expand Down
9 changes: 0 additions & 9 deletions drainer/translator/flash_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,6 @@ func isHandleTypeColumn(colDef *ast.ColumnDef) bool {
tp == mysql.TypeLonglong
}

func toFlashColumnTypeMap(columns []*model.ColumnInfo) map[int64]*types.FieldType {
colTypeMap := make(map[int64]*types.FieldType)
for _, col := range columns {
colTypeMap[col.ID] = &col.FieldType
}

return colTypeMap
}

func makeRow(pk int64, values []interface{}, version uint64, delFlag uint8) []interface{} {
var row []interface{}
row = append(row, pk)
Expand Down
65 changes: 20 additions & 45 deletions drainer/translator/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,12 @@ func insertRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, e
row = new(obinlog.Row)

for _, col := range columns {
var column *obinlog.Column
val, ok := columnValues[col.ID]
if ok {
column = DatumToColumn(col, val)
} else {
if col.DefaultValue == nil {
column = nullColumn()
} else {
log.Fatal("can't find value col: ", col, "default value: ", col.DefaultValue)
}
if !ok {
val = types.NewDatum(col.DefaultValue)
}

column := DatumToColumn(col, val)
row.Columns = append(row.Columns, column)
}

Expand All @@ -167,17 +162,12 @@ func deleteRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, e
row = new(obinlog.Row)

for _, col := range columns {
var column *obinlog.Column
val, ok := columnValues[col.ID]
if ok {
column = DatumToColumn(col, val)
} else {
if col.DefaultValue == nil {
column = nullColumn()
} else {
log.Fatal("can't find value col: ", col, "default value: ", col.DefaultValue)
}
if !ok {
val = types.NewDatum(col.DefaultValue)
}

column := DatumToColumn(col, val)
row.Columns = append(row.Columns, column)
}

Expand All @@ -194,28 +184,20 @@ func updateRowToRow(tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, c
row = new(obinlog.Row)
changedRow = new(obinlog.Row)
for _, col := range tableInfo.Columns {
if val, ok := newDatums[col.ID]; ok {
column := DatumToColumn(col, val)
row.Columns = append(row.Columns, column)
} else {
if col.DefaultValue == nil {
column := nullColumn()
row.Columns = append(row.Columns, column)
} else {
log.Fatal("can't find value col: ", col, "default value: ", col.DefaultValue)
}
var val types.Datum
var ok bool

if val, ok = newDatums[col.ID]; !ok {
val = types.NewDatum(col.DefaultValue)
}
if val, ok := oldDatums[col.ID]; ok {
column := DatumToColumn(col, val)
changedRow.Columns = append(changedRow.Columns, column)
} else {
if col.DefaultValue == nil {
column := nullColumn()
row.Columns = append(row.Columns, column)
} else {
log.Fatal("can't find value col: ", col, "default value: ", col.DefaultValue)
}
column := DatumToColumn(col, val)
row.Columns = append(row.Columns, column)

if val, ok = oldDatums[col.ID]; !ok {
val = types.NewDatum(col.DefaultValue)
}
column = DatumToColumn(col, val)
changedRow.Columns = append(changedRow.Columns, column)
}

return
Expand Down Expand Up @@ -289,10 +271,3 @@ func DatumToColumn(colInfo *model.ColumnInfo, datum types.Datum) (col *obinlog.C

return
}

func nullColumn() (col *obinlog.Column) {
col = new(obinlog.Column)
col.IsNull = proto.Bool(true)

return
}
2 changes: 2 additions & 0 deletions drainer/translator/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync/atomic"
"time"

"github.com/ngaut/log"
"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
Expand Down Expand Up @@ -67,6 +68,7 @@ func (m *mysqlTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r
for _, col := range columns {
val, ok := columnValues[col.ID]
if !ok {
log.Infof("get default value: %+v from schema directly", col.DefaultValue)
vals = append(vals, col.DefaultValue)
} else {
value, err := formatData(val, col.FieldType)
Expand Down
Loading

0 comments on commit 6b29d3b

Please sign in to comment.