Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drainer: bugfix, handle "missing column" when a column is getting dropped #827

Merged
merged 3 commits into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion drainer/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,13 @@ func (c *Collector) syncBinlog(item *binlogItem) error {

log.Info("get ddl job", zap.Stringer("job", job))

if skipJob(job) {
isDelOnlyEvent := model.SchemaState(binlog.DdlSchemaState) == model.StateDeleteOnly
if skipJob(job) && !isDelOnlyEvent {
return nil
}
if isDelOnlyEvent {
job.SchemaState = model.StateDeleteOnly
}
item.SetJob(job)
ddlJobsCounter.Add(float64(1))
}
Expand Down
51 changes: 33 additions & 18 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Schema struct {
tables map[int64]*model.TableInfo

truncateTableID map[int64]struct{}
tblsDroppingCol map[int64]bool

schemaMetaVersion int64

Expand All @@ -56,6 +57,7 @@ func NewSchema(jobs []*model.Job, hasImplicitCol bool) (*Schema, error) {
hasImplicitCol: hasImplicitCol,
version2SchemaTable: make(map[int64]TableName),
truncateTableID: make(map[int64]struct{}),
tblsDroppingCol: make(map[int64]bool),
jobs: jobs,
}

Expand Down Expand Up @@ -231,25 +233,27 @@ func (s *Schema) addJob(job *model.Job) {
func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
var i int
for i = 0; i < len(s.jobs); i++ {
if skipJob(s.jobs[i]) {
log.Debug("skip ddl job", zap.Stringer("job", s.jobs[i]))
job := s.jobs[i]

if job.BinlogInfo.SchemaVersion > version {
break
}

if job.BinlogInfo.SchemaVersion <= s.currentVersion {
log.Warn("ddl job schema version is less than current version, skip this ddl job",
zap.Stringer("job", job),
zap.Int64("currentVersion", s.currentVersion))
continue
}

if s.jobs[i].BinlogInfo.SchemaVersion <= version {
if s.jobs[i].BinlogInfo.SchemaVersion <= s.currentVersion {
log.Warn("ddl job schema version is less than current version, skip this ddl job",
zap.Stringer("job", s.jobs[i]),
zap.Int64("currentVersion", s.currentVersion))
continue
}

_, _, _, err := s.handleDDL(s.jobs[i])
if err != nil {
return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s)
}
} else {
break
if job.SchemaState == model.StateDeleteOnly && job.Type == model.ActionDropColumn {
s.tblsDroppingCol[job.TableID] = true
log.Info("Got DeleteOnly Job", zap.Stringer("job", job))
continue
}
_, _, _, err := s.handleDDL(job)
if err != nil {
return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s)
}
}

Expand All @@ -264,12 +268,13 @@ func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
// the third value[string]: the sql that is corresponding to the job
// the fourth value[error]: the handleDDL execution's err
func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, sql string, err error) {
log.Debug("handle job: ", zap.Stringer("job", job))

if skipJob(job) {
log.Debug("Skip job", zap.Stringer("job", job))
return "", "", "", nil
}

log.Debug("Handle job", zap.Stringer("job", job))

sql = job.Query
if sql == "" {
return "", "", "", errors.Errorf("[ddl job sql miss]%+v", job)
Expand Down Expand Up @@ -426,11 +431,21 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
s.currentVersion = job.BinlogInfo.SchemaVersion
schemaName = schema.Name.O
tableName = tbInfo.Name.O

if job.Type == model.ActionDropColumn {
log.Info("Finished dropping column", zap.Stringer("job", job))
delete(s.tblsDroppingCol, job.TableID)
}
}

return
}

// IsDroppingColumn returns true if the table is in the middle of dropping a column
func (s *Schema) IsDroppingColumn(id int64) bool {
return s.tblsDroppingCol[id]
}

// IsTruncateTableID returns true if the table id have been truncated by truncate table DDL
func (s *Schema) IsTruncateTableID(id int64) bool {
_, ok := s.truncateTableID[id]
Expand Down
2 changes: 1 addition & 1 deletion drainer/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (t *schemaSuite) TestSchema(c *C) {
jobs = append(jobs, job)

// construct a rollbackdone job
jobs = append(jobs, &model.Job{ID: 5, State: model.JobStateRollbackDone})
jobs = append(jobs, &model.Job{ID: 5, State: model.JobStateRollbackDone, BinlogInfo: &model.HistoryInfo{}})

// reconstruct the local schema
schema, err := NewSchema(jobs, false)
Expand Down
9 changes: 7 additions & 2 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ ForLoop:
lastAddComitTS = binlog.GetCommitTs()
err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: preWrite})
if err != nil {
err = errors.Annotate(err, "add to dsyncer failed")
err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs)
break ForLoop
}
executeHistogram.Observe(time.Since(beginTime).Seconds())
Expand All @@ -383,6 +383,11 @@ ForLoop:
break ForLoop
}

if b.job.SchemaState == model.StateDeleteOnly && b.job.Type == model.ActionDropColumn {
log.Info("Syncer skips DeleteOnly DDL", zap.Stringer("job", b.job), zap.Int64("ts", b.GetCommitTs()))
continue
}

sql := b.job.Query
var schema, table string
schema, table, err = s.schema.getSchemaTableAndDelete(b.job.BinlogInfo.SchemaVersion)
Expand All @@ -404,7 +409,7 @@ ForLoop:

err = s.dsyncer.Sync(&dsync.Item{Binlog: binlog, PrewriteValue: nil, Schema: schema, Table: table})
if err != nil {
err = errors.Annotate(err, "add to dsyncer failed")
err = errors.Annotatef(err, "add to dsyncer, commit ts %d", binlog.CommitTs)
break ForLoop
}
executeHistogram.Observe(time.Since(beginTime).Seconds())
Expand Down
7 changes: 4 additions & 3 deletions drainer/translator/flash.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func GenFlashSQLs(infoGetter TableInfoGetter, pv *tipb.PrewriteValue, commitTS i
if !ok {
return nil, nil, errors.Errorf("TableByID empty table id: %d", mut.GetTableId())
}
isTblDroppingCol := infoGetter.IsDroppingColumn(mut.GetTableId())

var schema string
schema, _, ok = infoGetter.SchemaAndTableName(mut.GetTableId())
Expand All @@ -68,7 +69,7 @@ func GenFlashSQLs(infoGetter TableInfoGetter, pv *tipb.PrewriteValue, commitTS i
return nil, nil, errors.Annotate(err, "gen insert sql fail")
}
case tipb.MutationType_Update:
sql, args, err = GenFlashUpdateSQL(schema, info, row, commitTS)
sql, args, err = GenFlashUpdateSQL(schema, info, row, commitTS, isTblDroppingCol)
if err != nil {
return nil, nil, errors.Annotate(err, "gen update sql fail")
}
Expand Down Expand Up @@ -140,15 +141,15 @@ func GenFlashInsertSQL(schema string, table *model.TableInfo, row []byte, commit
}

// GenFlashUpdateSQL generate the SQL need to execute syncing this update row to Flash
func GenFlashUpdateSQL(schema string, table *model.TableInfo, row []byte, commitTS int64) (sql string, args []interface{}, err error) {
func GenFlashUpdateSQL(schema string, table *model.TableInfo, row []byte, commitTS int64, isTblDroppingCol bool) (sql string, args []interface{}, err error) {
schema = strings.ToLower(schema)
pkColumn := pkHandleColumn(table)
if pkColumn == nil {
pkColumn = fakeImplicitColumn(table)
}
pkID := pkColumn.ID

updtDecoder := newUpdateDecoder(table)
updtDecoder := newUpdateDecoder(table, isTblDroppingCol)
version := makeInternalVersionValue(uint64(commitTS))
delFlag := makeInternalDelmarkValue(false)

Expand Down
Loading