From de0f87850337c894ed929ee0936fa5798f310734 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 13 Dec 2021 20:44:29 +0800 Subject: [PATCH 1/2] syncer(dm): use an early location to reset binlog and open safemode --- dm/syncer/syncer.go | 21 +++++++++++++- dm/tests/_utils/check_sync_diff | 2 +- dm/tests/duplicate_event/run.sh | 46 ------------------------------- dm/tests/others_integration_2.txt | 1 + 4 files changed, 22 insertions(+), 48 deletions(-) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 96437d3bf67..aa63a10e4c9 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -1554,6 +1554,9 @@ func (s *Syncer) Run(ctx context.Context) (err error) { return nil } + inFinerRetry := false + // in release branch, we only use eventIndex to test a bug + eventIndex := 0 for { if s.execError.Load() != nil { return nil @@ -1595,6 +1598,14 @@ func (s *Syncer) Run(ctx context.Context) (err error) { err = errors.New("connect: connection refused") } }) + failpoint.Inject("GetEventErrorInTxn", func(val failpoint.Value) { + if intVal, ok := val.(int); ok && intVal == eventIndex { + err = errors.New("failpoint triggered") + s.tctx.L().Warn("failed to get event", zap.Int("event_index", eventIndex), + zap.Any("cur_pos", currentLocation), zap.Any("las_pos", lastLocation), + zap.Any("pos", e.Header.LogPos), log.ShortError(err)) + } + }) switch { case err == context.Canceled: tctx.L().Info("binlog replication main routine quit(context canceled)!", zap.Stringer("last location", lastLocation)) @@ -1621,11 +1632,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) { if s.streamerController.CanRetry(err) { // lastLocation is the last finished GTID - err = s.streamerController.ResetReplicationSyncer(tctx, lastLocation) + err = s.streamerController.ResetReplicationSyncer(tctx, s.checkpoint.GlobalPoint()) if err != nil { return err } log.L().Info("reset replication binlog puller", zap.Any("pos", s.checkpoint.GlobalPoint())) + _ = s.safeMode.Add(tctx, 1) + inFinerRetry = true continue } @@ -1777,12 +1790,18 @@ func (s *Syncer) Run(ctx context.Context) (err error) { case *replication.RotateEvent: err2 = s.handleRotateEvent(ev, ec) case *replication.RowsEvent: + eventIndex++ metrics.BinlogEventRowHistogram.WithLabelValues(s.cfg.WorkerName, s.cfg.Name, s.cfg.SourceID).Observe(float64(len(ev.Rows))) err2 = s.handleRowsEvent(ev, ec) case *replication.QueryEvent: originSQL = strings.TrimSpace(string(ev.Query)) err2 = s.handleQueryEvent(ev, ec, originSQL) case *replication.XIDEvent: + eventIndex = 0 + if inFinerRetry { + inFinerRetry = false + _ = s.safeMode.Add(tctx, -1) + } if shardingReSync != nil { shardingReSync.currLocation.Position.Pos = e.Header.LogPos shardingReSync.currLocation.Suffix = currentLocation.Suffix diff --git a/dm/tests/_utils/check_sync_diff b/dm/tests/_utils/check_sync_diff index e4ed59f3245..2b7bcd17bdf 100755 --- a/dm/tests/_utils/check_sync_diff +++ b/dm/tests/_utils/check_sync_diff @@ -23,7 +23,7 @@ cd $workdir i=0 while [ $i -lt $check_time ]; do rm -rf $OUTPUT_DIR - $binary --config=$conf >>$LOG 2>&1 + $binary --config=$conf >$LOG 2>&1 ret=$? if [ "$ret" == 0 ]; then echo "check diff successfully" diff --git a/dm/tests/duplicate_event/run.sh b/dm/tests/duplicate_event/run.sh index f70b80c5805..6f78624f9f1 100644 --- a/dm/tests/duplicate_event/run.sh +++ b/dm/tests/duplicate_event/run.sh @@ -34,53 +34,7 @@ function run() { run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_log_contain_with_retry "reset replication binlog puller" $WORK_DIR/worker1/log/dm-worker.log - check_log_contain_with_retry "discard event already consumed" $WORK_DIR/worker1/log/dm-worker.log check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - - # 2. test relay log retry relay with GTID - - # with a 5 rows insert txn: 1 * FormatDesc + 1 * PreviousGTID + 1 * GTID + 1 * BEGIN + 5 * (Table_map + Write_rows) + 1 * XID - # here we fail at the third write rows event, sync should retry and auto recover without any duplicate event - export GO_FAILPOINTS="github.com/pingcap/ticdc/dm/relay/RelayGetEventFailed=15*return(3);github.com/pingcap/ticdc/dm/relay/retry/RelayAllowRetry=return" - - run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT - - cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml - dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 - - run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 - check_contains 'Query OK, 2 rows affected' - - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-task $cur/conf/dm-task-relay.yaml --remove-meta" - check_metric $WORKER2_PORT "dm_worker_task_state{source_id=\"mysql-replica-02\",task=\"test_relay\",worker=\"worker2\"}" 10 1 3 - - check_sync_diff $WORK_DIR $cur/conf/diff_relay_config.toml - - run_sql_source2 "flush logs;" - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID2 worker2" \ - "\"result\": true" 1 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status -s $SOURCE_ID2" \ - "\"relayCatchUpMaster\": true" 1 - - run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 - - check_log_contain_with_retry "retrying to read binlog" $WORK_DIR/worker2/log/dm-worker.log - check_log_contain_with_retry "discard duplicate event" $WORK_DIR/worker2/log/dm-worker.log - - check_sync_diff $WORK_DIR $cur/conf/diff_relay_config.toml - - # check relay log binlog file size is the same as master size - run_sql_source2 "show master status;" - binlog_file=$(grep "File" $TEST_DIR/sql_res.$TEST_NAME.txt | awk -F: '{print $2}' | xargs) - binlog_pos=$(grep "Position" $TEST_DIR/sql_res.$TEST_NAME.txt | awk -F: '{print $2}' | xargs) - - server_uuid=$(tail -n 1 $WORK_DIR/worker2/relay-dir/server-uuid.index) - relay_log_size=$(ls -al $WORK_DIR/worker2/relay-dir/$server_uuid/$binlog_file | awk '{print $5}') - [ "$binlog_pos" -eq "$relay_log_size" ] } # also cleanup dm processes in case of last run failed diff --git a/dm/tests/others_integration_2.txt b/dm/tests/others_integration_2.txt index eb7c5d37db7..c8959ae28bb 100644 --- a/dm/tests/others_integration_2.txt +++ b/dm/tests/others_integration_2.txt @@ -7,4 +7,5 @@ case_sensitive sql_mode http_proxies openapi +duplicate_event tracker_ignored_ddl From 73386ec412e72fc0b1769e64cd50dc9925ad31a4 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 15 Dec 2021 19:38:44 +0800 Subject: [PATCH 2/2] address comment --- dm/syncer/syncer.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index aa63a10e4c9..5f3fed77845 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -1637,7 +1637,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { return err } log.L().Info("reset replication binlog puller", zap.Any("pos", s.checkpoint.GlobalPoint())) - _ = s.safeMode.Add(tctx, 1) inFinerRetry = true continue } @@ -1777,7 +1776,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { shardingReSync: shardingReSync, closeShardingResync: closeShardingResync, traceSource: traceSource, - safeMode: s.safeMode.Enable(), + safeMode: s.safeMode.Enable() || inFinerRetry, tryReSync: tryReSync, startTime: startTime, shardingReSyncCh: &shardingReSyncCh, @@ -1800,7 +1799,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) { eventIndex = 0 if inFinerRetry { inFinerRetry = false - _ = s.safeMode.Add(tctx, -1) } if shardingReSync != nil { shardingReSync.currLocation.Position.Pos = e.Header.LogPos