From dd8030acef4150fe2b149fae63701e78368420c1 Mon Sep 17 00:00:00 2001 From: GMHDBJD <35025882+GMHDBJD@users.noreply.github.com> Date: Thu, 26 Nov 2020 11:40:59 +0800 Subject: [PATCH] cherry pick #1297 to release-2.0 Signed-off-by: ti-srebot --- syncer/handle_error.go | 5 ++++- syncer/syncer.go | 26 ++++++++++++++------------ tests/handle_error/run.sh | 20 ++++---------------- 3 files changed, 22 insertions(+), 29 deletions(-) diff --git a/syncer/handle_error.go b/syncer/handle_error.go index 302c4fd037..03fe614be0 100644 --- a/syncer/handle_error.go +++ b/syncer/handle_error.go @@ -33,10 +33,13 @@ func (s *Syncer) HandleError(ctx context.Context, req *pb.HandleWorkerErrorReque pos := req.BinlogPos if len(pos) == 0 { - startLocation := s.getErrLocation() + startLocation, isQueryEvent := s.getErrLocation() if startLocation == nil { return fmt.Errorf("source '%s' has no error", s.cfg.SourceID) } + if !isQueryEvent { + return fmt.Errorf("only support to handle ddl error currently, see https://docs.pingcap.com/tidb-data-migration/stable/error-handling for other errors") + } pos = startLocation.Position.String() } else { startLocation, err := binlog.VerifyBinlogPos(pos) diff --git a/syncer/syncer.go b/syncer/syncer.go index 450ef47ecb..0faf3bdc7c 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -179,6 +179,7 @@ type Syncer struct { sync.RWMutex startLocation *binlog.Location endLocation *binlog.Location + isQueryEvent bool } addJobFunc func(*job) error @@ -433,7 +434,7 @@ func (s *Syncer) reset() { s.newJobChans(s.cfg.WorkerCount + 1) s.execError.Set(nil) - s.setErrLocation(nil, nil) + s.setErrLocation(nil, nil, false) s.isReplacingErr = false switch s.cfg.ShardMode { @@ -936,7 +937,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, if err != nil { s.execError.Set(err) if !utils.IsContextCanceledError(err) { - err = s.handleEventError(err, sqlJob.startLocation, sqlJob.currentLocation) + err = s.handleEventError(err, sqlJob.startLocation, sqlJob.currentLocation, true) s.runFatalChan <- unit.NewProcessError(err) } s.jobWg.Done() @@ -972,7 +973,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, if err != nil { s.execError.Set(err) if !utils.IsContextCanceledError(err) { - err = s.handleEventError(err, sqlJob.startLocation, sqlJob.currentLocation) + err = s.handleEventError(err, sqlJob.startLocation, sqlJob.currentLocation, true) s.runFatalChan <- unit.NewProcessError(err) } s.jobWg.Done() @@ -1007,7 +1008,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo fatalF := func(affected int, err error) { s.execError.Set(err) if !utils.IsContextCanceledError(err) { - err = s.handleEventError(err, jobs[affected].startLocation, jobs[affected].currentLocation) + err = s.handleEventError(err, jobs[affected].startLocation, jobs[affected].currentLocation, false) s.runFatalChan <- unit.NewProcessError(err) } clearF() @@ -1328,11 +1329,11 @@ func (s *Syncer) Run(ctx context.Context) (err error) { tctx.L().Debug("receive binlog event", zap.Reflect("header", e.Header)) // TODO: support all event - // we calculate startLocation and endLocation(currentLocation) for Rows/Query event here + // we calculate startLocation and endLocation(currentLocation) for Query event here // set startLocation empty for other events to avoid misuse startLocation = binlog.Location{} switch e.Event.(type) { - case *replication.RowsEvent, *replication.QueryEvent: + case *replication.QueryEvent: startLocation = binlog.InitLocation( mysql.Position{ Name: lastLocation.Position.Name, @@ -1469,7 +1470,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } } if err2 != nil { - if err := s.handleEventError(err2, startLocation, currentLocation); err != nil { + if err := s.handleEventError(err2, startLocation, currentLocation, e.Header.EventType == replication.QUERY_EVENT); err != nil { return err } } @@ -2709,10 +2710,11 @@ func (s *Syncer) ShardDDLOperation() *pessimism.Operation { return s.pessimist.PendingOperation() } -func (s *Syncer) setErrLocation(startLocation, endLocation *binlog.Location) { +func (s *Syncer) setErrLocation(startLocation, endLocation *binlog.Location, isQueryEventEvent bool) { s.errLocation.Lock() defer s.errLocation.Unlock() + s.errLocation.isQueryEvent = isQueryEventEvent if s.errLocation.startLocation == nil || startLocation == nil { s.errLocation.startLocation = startLocation } else if binlog.CompareLocation(*startLocation, *s.errLocation.startLocation, s.cfg.EnableGTID) < 0 { @@ -2726,18 +2728,18 @@ func (s *Syncer) setErrLocation(startLocation, endLocation *binlog.Location) { } } -func (s *Syncer) getErrLocation() *binlog.Location { +func (s *Syncer) getErrLocation() (*binlog.Location, bool) { s.errLocation.Lock() defer s.errLocation.Unlock() - return s.errLocation.startLocation + return s.errLocation.startLocation, s.errLocation.isQueryEvent } -func (s *Syncer) handleEventError(err error, startLocation, endLocation binlog.Location) error { +func (s *Syncer) handleEventError(err error, startLocation, endLocation binlog.Location, isQueryEvent bool) error { if err == nil { return nil } - s.setErrLocation(&startLocation, &endLocation) + s.setErrLocation(&startLocation, &endLocation, isQueryEvent) return terror.Annotatef(err, "startLocation: [%s], endLocation: [%s]", startLocation, endLocation) } diff --git a/tests/handle_error/run.sh b/tests/handle_error/run.sh index ec86427348..efe98f7b5c 100644 --- a/tests/handle_error/run.sh +++ b/tests/handle_error/run.sh @@ -40,32 +40,20 @@ function DM_SKIP_ERROR_CASE() { # skip one source run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "handle-error test skip -s mysql-replica-01" \ - "\"result\": true" 2 + "only support to handle ddl error currently" 1 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "\"stage\": \"Running\"" 1 \ - "\"stage\": \"Paused\"" 1 + "\"stage\": \"Paused\"" 2 # skip all sources run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "handle-error test skip" \ - "\"result\": true" 2 \ - "\"source 'mysql-replica-01' has no error\"" 1 + "only support to handle ddl error currently" 2 run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status test" \ - "\"stage\": \"Running\"" 2 - - # '11' -> 11, '22' -> 22, no error - run_sql_source1 "insert into ${db}.${tb1} values('111',7)" - run_sql_source2 "insert into ${db}.${tb2} values('222',8)" - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status test" \ - "\"stage\": \"Running\"" 2 - - run_sql_tidb_with_retry "select count(1) from ${db}.${tb1} where id=111;" "count(1): 1" - run_sql_tidb_with_retry "select count(1) from ${db}.${tb2} where id=222;" "count(1): 1" + "\"stage\": \"Paused\"" 2 } function DM_SKIP_ERROR() {