Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
DM: fix update lastPos and save checkpoint (#400)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc committed Oct 31, 2018
1 parent 92fc36b commit 220aafd
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 26 deletions.
26 changes: 19 additions & 7 deletions syncer/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,32 +52,44 @@ func trimCtrlChars(s string) string {
return strings.TrimFunc(s, f)
}

// resolveDDLSQL resolve to one ddl sql
// example: drop table test.a,test2.b -> drop table test.a; drop table test2.b;
func (s *Syncer) resolveDDLSQL(sql string, p *parser.Parser, schema string) (sqls []string, tables map[string]*filter.Table, isDDL bool, err error) {
func (s *Syncer) parseDDLSQL(sql string, p *parser.Parser, schema string) (ast.StmtNode, bool, error) {
sql = trimCtrlChars(sql)
// We use Parse not ParseOneStmt here, because sometimes we got a commented out ddl which can't be parsed
// by ParseOneStmt(it's a limitation of tidb parser.)
stmts, err := p.Parse(sql, "", "")
if err != nil {
// log error rather than fatal, so other defer can be executed
log.Errorf(IncompatibleDDLFormat, sql)
return []string{sql}, nil, false, errors.Annotatef(err, IncompatibleDDLFormat, sql)
return nil, false, errors.Annotatef(err, IncompatibleDDLFormat, sql)
}

if len(stmts) == 0 {
return nil, nil, false, nil
return nil, false, nil
}

stmt := stmts[0]
switch stmt.(type) {
case ast.DDLNode:
// do nothing
return stmt, true, nil
case ast.DMLNode:
return nil, nil, false, errors.Annotatef(ErrDMLStatementFound, "query %s", sql)
return nil, false, errors.Annotatef(ErrDMLStatementFound, "query %s", sql)
default:
// BEGIN statement is included here.
// let sqls be empty
return nil, false, nil
}
}

// resolveDDLSQL resolve to one ddl sql
// example: drop table test.a,test2.b -> drop table test.a; drop table test2.b;
func (s *Syncer) resolveDDLSQL(sql string, p *parser.Parser, schema string) (sqls []string, tables map[string]*filter.Table, isDDL bool, err error) {
// would remove it later
var stmt ast.StmtNode
stmt, isDDL, err = s.parseDDLSQL(sql, p, schema)
if err != nil {
return []string{sql}, nil, false, errors.Trace(err)
}
if !isDDL {
return nil, nil, false, nil
}

Expand Down
45 changes: 26 additions & 19 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,8 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}
if ignore {
binlogSkippedEventsTotal.WithLabelValues("rows", s.cfg.Name).Inc()
if err = s.recordSkipSQLsPos(currentPos, nil); err != nil {
// for RowsEvent, we should record lastPos rather than currentPos
if err = s.recordSkipSQLsPos(lastPos, nil); err != nil {
return errors.Trace(err)
}

Expand Down Expand Up @@ -1145,35 +1146,48 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}
}
case *replication.QueryEvent:
currentPos = mysql.Position{
Name: lastPos.Name,
Pos: e.Header.LogPos,
}
sql := strings.TrimSpace(string(ev.Query))
_, isDDL, err := s.parseDDLSQL(sql, parser2, string(ev.Schema))
if err != nil {
log.Infof("[query]%s [last pos]%v [current pos]%v [current gtid set]%v", sql, lastPos, currentPos, ev.GSet)
log.Errorf("fail to be parsed, error %v", err)
return errors.Trace(err)
}

if !isDDL {
// skipped sql maybe not a DDL (like `BEGIN`)
continue
}

if shardingReSync != nil {
shardingReSync.currPos.Pos = e.Header.LogPos
lastPos = shardingReSync.currPos
if shardingReSync.currPos.Compare(shardingReSync.latestPos) >= 0 {
log.Infof("[syncer] sharding group %v re-syncing completed", shardingReSync)
closeShardingSyncer()
} else {
// in re-syncing, we can simply skip all DDLs
log.Debugf("[syncer] skip query event when re-syncing sharding group %v", shardingReSync)
// only update lastPos when the query is a real DDL
lastPos = shardingReSync.currPos
log.Debugf("[syncer] skip query event when re-syncing sharding group %+v", shardingReSync)
}
continue
}

log.Infof("[query]%s [last pos]%v [current pos]%v [current gtid set]%v", sql, lastPos, currentPos, ev.GSet)
lastPos = currentPos // update lastPos, and we have checked `isDDL`
latestOp = ddl
sql := strings.TrimSpace(string(ev.Query))
currentPos = mysql.Position{
Name: lastPos.Name,
Pos: e.Header.LogPos,
}

ignore, err := s.skipQuery(nil, nil, sql)
if err != nil {
return errors.Trace(err)
}
if ignore {
binlogSkippedEventsTotal.WithLabelValues("query", s.cfg.Name).Inc()
log.Warnf("[skip query-sql]%s [schema]:%s", sql, ev.Schema)
lastPos = currentPos // before record skip pos, update lastPos
if err = s.recordSkipSQLsPos(currentPos, nil); err != nil {
if err = s.recordSkipSQLsPos(lastPos, nil); err != nil {
return errors.Trace(err)
}
continue
Expand All @@ -1182,7 +1196,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
var (
sqls []string
onlineDDLTableNames map[string]*filter.Table
isDDL bool
)

operator := s.GetOperator(currentPos)
Expand All @@ -1201,18 +1214,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
log.Errorf("fail to be parsed, error %v", err)
return errors.Trace(err)
}

if !isDDL {
continue
}
}

if len(onlineDDLTableNames) > 1 {
return errors.NotSupportedf("online ddl changes on multiple table: %s", string(ev.Query))
}

log.Infof("[query]%s [last pos]%v [current pos]%v [current gtid set]%v", sql, lastPos, currentPos, ev.GSet)
lastPos = currentPos // update lastPos
binlogEvent.WithLabelValues("query", s.cfg.Name).Observe(time.Since(startTime).Seconds())

/*
Expand Down Expand Up @@ -1295,7 +1302,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
log.Infof("need handled ddls %v in position %v", needHandleDDLs, currentPos)
if len(needHandleDDLs) == 0 {
log.Infof("skip query %s in position %v", string(ev.Query), currentPos)
if err = s.recordSkipSQLsPos(currentPos, nil); err != nil {
if err = s.recordSkipSQLsPos(lastPos, nil); err != nil {
return errors.Trace(err)
}
continue
Expand Down

0 comments on commit 220aafd

Please sign in to comment.