Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix affect of online DDL #466

Merged
merged 12 commits into from
Feb 27, 2019
49 changes: 32 additions & 17 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Schema struct {
schemas map[int64]*model.DBInfo
tables map[int64]*model.TableInfo

truncateTableID map[int64]struct{}

schemaMetaVersion int64

hasImplicitCol bool
Expand All @@ -41,6 +43,7 @@ func NewSchema(jobs []*model.Job, hasImplicitCol bool) (*Schema, error) {
s := &Schema{
hasImplicitCol: hasImplicitCol,
version2SchemaTable: make(map[int64]TableName),
truncateTableID: make(map[int64]struct{}),
jobs: jobs,
}

Expand Down Expand Up @@ -253,12 +256,12 @@ func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
// the second value[string]: the table name
// the third value[string]: the sql that is corresponding to the job
// the fourth value[error]: the handleDDL execution's err
func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {
func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) {
if skipJob(job) {
return "", "", "", nil
}

sql := job.Query
sql = job.Query
if sql == "" {
return "", "", "", errors.Errorf("[ddl job sql miss]%+v", job)
}
Expand All @@ -275,17 +278,16 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, ""}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, "", sql, nil
schemaName = schema.Name.O

case model.ActionDropSchema:
schemaName, err := s.DropSchema(job.SchemaID)
schemaName, err = s.DropSchema(job.SchemaID)
if err != nil {
return "", "", "", errors.Trace(err)
}

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schemaName, ""}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schemaName, "", sql, nil

case model.ActionRenameTable:
// ignore schema doesn't support reanme ddl
Expand All @@ -312,7 +314,8 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, table.Name.O, sql, nil
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionCreateTable:
table := job.BinlogInfo.TableInfo
Expand All @@ -332,29 +335,31 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, table.Name.O, sql, nil
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionDropTable:
schema, ok := s.SchemaByID(job.SchemaID)
if !ok {
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
}

tableName, err := s.DropTable(job.TableID)
tableName, err = s.DropTable(job.TableID)
if err != nil {
return "", "", "", errors.Trace(err)
}

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, tableName}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, tableName, sql, nil
schemaName = schema.Name.O

case model.ActionTruncateTable:
schema, ok := s.SchemaByID(job.SchemaID)
if !ok {
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
}

// job.TableID is the old table id, different from table.ID
_, err := s.DropTable(job.TableID)
if err != nil {
return "", "", "", errors.Trace(err)
Expand All @@ -372,7 +377,9 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, table.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, table.Name.O, sql, nil
schemaName = schema.Name.O
tableName = table.Name.O
s.truncateTableID[job.TableID] = struct{}{}

default:
binlogInfo := job.BinlogInfo
Expand All @@ -396,8 +403,17 @@ func (s *Schema) handleDDL(job *model.Job) (string, string, string, error) {

s.version2SchemaTable[job.BinlogInfo.SchemaVersion] = TableName{schema.Name.O, tbInfo.Name.O}
s.currentVersion = job.BinlogInfo.SchemaVersion
return schema.Name.O, tbInfo.Name.O, sql, nil
schemaName = schema.Name.O
tableName = tbInfo.Name.O
}

return
}

// IsTruncateTableID returns true if the table id have been truncated by truncate table DDL
func (s *Schema) IsTruncateTableID(id int64) bool {
_, ok := s.truncateTableID[id]
return ok
}

func (s *Schema) getSchemaTableAndDelete(version int64) (string, string, error) {
Expand Down Expand Up @@ -425,11 +441,10 @@ func addImplicitColumn(table *model.TableInfo) {
table.Indices = []*model.IndexInfo{newIndex}
}

// there's only two status will be in HistoryDDLJob(we fetch at start time):
// JobStateSynced and JobStateRollbackDone
// If it fail to commit(to tikv) in 2pc phrase (when changing JobStateDone -> JobStateSynced and add to HistoryDDLJob),
// then is would't not be add to HistoryDDLJob, and we may get (prewrite + rollback binlog),
// this binlog event would reach drainer, finally we will get a (p + commit binlog) when tidb retry and successfully commit
// TiDB write DDL Binlog for every DDL Job, we must ignore jobs that are cancelled or rollback
// For older version TiDB, it write DDL Binlog in the txn that the state of job is changed to *synced*
// Now, it write DDL Binlog in the txn that the state of job is changed to *done* (before change to *synced*)
// At state *done*, it will be always and only changed to *synced*.
func skipJob(job *model.Job) bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WangXiangUSTC @GregoryIan
if upgrade to TiDB with the write ddl binlog when change state to done, must upgrade drainer too, or may there's risk losing ddl event(skip here because it's still at done state)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send an email to business team @july2993

return !job.IsSynced()
return !job.IsSynced() && !job.IsDone()
}
38 changes: 35 additions & 3 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ type Syncer struct {
positions map[string]int64
initCommitTS int64

// because TiDB is case-insensitive, only lower-case here.
ignoreSchemaNames map[string]struct{}

ctx context.Context
cancel context.CancelFunc

Expand Down Expand Up @@ -418,6 +415,9 @@ func (s *Syncer) run(jobs []*model.Job) error {
}
}
s.schema, err = NewSchema(jobs, false)
if err != nil {
return errors.Trace(err)
}

s.executors, err = createExecutors(s.cfg.DestDBType, s.cfg.To, s.cfg.WorkerCount)
if err != nil {
Expand All @@ -436,6 +436,7 @@ func (s *Syncer) run(jobs []*model.Job) error {
go s.sync(s.executors[i], s.jobCh[i], i)
}

var lastDDLSchemaVersion int64
var b *binlogItem
for {
select {
Expand Down Expand Up @@ -463,7 +464,16 @@ func (s *Syncer) run(jobs []*model.Job) error {
return errors.Errorf("prewrite %s unmarshal error %v", preWriteValue, err)
}

err = s.rewriteForOldVersion(preWrite)
if err != nil {
return errors.Annotate(err, "rewrite for old version fail")
}

log.Debug("DML SchemaVersion: ", preWrite.SchemaVersion)
if preWrite.SchemaVersion < lastDDLSchemaVersion {
log.Debug("encounter older schema dml")
}

err = s.schema.handlePreviousDDLJobIfNeed(preWrite.SchemaVersion)
if err != nil {
return errors.Trace(err)
Expand All @@ -481,6 +491,8 @@ func (s *Syncer) run(jobs []*model.Job) error {
s.schema.addJob(b.job)

log.Debug("DDL SchemaVersion: ", b.job.BinlogInfo.SchemaVersion)
lastDDLSchemaVersion = b.job.BinlogInfo.SchemaVersion

err = s.schema.handlePreviousDDLJobIfNeed(b.job.BinlogInfo.SchemaVersion)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -645,3 +657,23 @@ func (s *Syncer) GetLastSyncTime() time.Time {
func (s *Syncer) GetLatestCommitTS() int64 {
return s.cp.TS()
}

// see https://github.com/pingcap/tidb/issues/9304
// currently, we only drop the data which table id is truncated.
// because of online DDL, different TiDB instance may see the different schema,
// it can't be treated simply as one timeline consider both DML and DDL,
// we must carefully handle every DDL type now and need to find a better design.
func (s *Syncer) rewriteForOldVersion(pv *pb.PrewriteValue) (err error) {
var mutations = make([]pb.TableMutation, 0, len(pv.GetMutations()))
for _, mutation := range pv.GetMutations() {
if s.schema.IsTruncateTableID(mutation.TableId) {
log.Infof("skip old version truncate dml, table id: %d", mutation.TableId)
continue
}

mutations = append(mutations, mutation)
}
pv.Mutations = mutations

return nil
}
4 changes: 2 additions & 2 deletions drainer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (t *testDrainerSuite) TestHandleDDL(c *C) {
c.Assert(sql, Equals, "")

// check job.Query is empty
job = &model.Job{ID: 1, State: model.JobStateSynced}
job = &model.Job{ID: 1, State: model.JobStateDone}
IANTHEREAL marked this conversation as resolved.
Show resolved Hide resolved
_, _, sql, err = s.schema.handleDDL(job)
c.Assert(sql, Equals, "")
c.Assert(err, NotNil, Commentf("should return not found job.Query"))
Expand Down Expand Up @@ -82,7 +82,7 @@ func (t *testDrainerSuite) TestHandleDDL(c *C) {

job = &model.Job{
ID: testCase.jobID,
State: model.JobStateSynced,
State: model.JobStateDone,
SchemaID: testCase.schemaID,
TableID: testCase.tableID,
Type: testCase.jobType,
Expand Down
24 changes: 2 additions & 22 deletions drainer/translator/flash.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
)

// flashTranslator translates TiDB binlog to flash sqls
Expand All @@ -42,42 +41,23 @@ func (f *flashTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r
version := makeInternalVersionValue(uint64(commitTS))
delFlag := makeInternalDelmarkValue(false)

colsTypeMap := toFlashColumnTypeMap(columns)
columnList := genColumnList(columns)
// addition 2 holder is for del flag and version
columnPlaceholders := dml.GenColumnPlaceholders(len(columns) + 2)
sql := fmt.Sprintf("IMPORT INTO `%s`.`%s` (%s) values (%s);", schema, table.Name.L, columnList, columnPlaceholders)

for _, row := range rows {
//decode the pk value
remain, pk, err := codec.DecodeOne(row)
hashKey := pk.GetInt64()
pk, columnValues, err := insertRowToDatums(table, row)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}

columnValues, err := tablecodec.DecodeRow(remain, colsTypeMap, gotime.Local)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}

if columnValues == nil {
columnValues = make(map[int64]types.Datum)
}
hashKey := pk.GetInt64()

var vals []interface{}
vals = append(vals, hashKey)
for _, col := range columns {
if IsPKHandleColumn(table, col) {
columnValues[col.ID] = pk
pkVal, err := formatFlashData(&pk, &col.FieldType)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
vals = append(vals, pkVal)
continue
}

val, ok := columnValues[col.ID]
if !ok {
vals = append(vals, col.DefaultValue)
Expand Down
9 changes: 0 additions & 9 deletions drainer/translator/flash_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,6 @@ func isHandleTypeColumn(colDef *ast.ColumnDef) bool {
tp == mysql.TypeLonglong
}

func toFlashColumnTypeMap(columns []*model.ColumnInfo) map[int64]*types.FieldType {
colTypeMap := make(map[int64]*types.FieldType)
for _, col := range columns {
colTypeMap[col.ID] = &col.FieldType
}

return colTypeMap
}

func makeRow(pk int64, values []interface{}, version uint64, delFlag uint8) []interface{} {
var row []interface{}
row = append(row, pk)
Expand Down
Loading