Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syncer(dm): use an early location to reset binlog and open safemode #3860

Merged
merged 5 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,9 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
return nil
}

inFinerRetry := false
// in release branch, we only use eventIndex to test a bug
eventIndex := 0
for {
if s.execError.Load() != nil {
return nil
Expand Down Expand Up @@ -1591,6 +1594,14 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
err = errors.New("connect: connection refused")
}
})
failpoint.Inject("GetEventErrorInTxn", func(val failpoint.Value) {
if intVal, ok := val.(int); ok && intVal == eventIndex {
err = errors.New("failpoint triggered")
s.tctx.L().Warn("failed to get event", zap.Int("event_index", eventIndex),
zap.Any("cur_pos", currentLocation), zap.Any("las_pos", lastLocation),
zap.Any("pos", e.Header.LogPos), log.ShortError(err))
}
})
switch {
case err == context.Canceled:
tctx.L().Info("binlog replication main routine quit(context canceled)!", zap.Stringer("last location", lastLocation))
Expand All @@ -1617,11 +1628,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) {

if s.streamerController.CanRetry(err) {
// lastLocation is the last finished GTID
err = s.streamerController.ResetReplicationSyncer(tctx, lastLocation)
err = s.streamerController.ResetReplicationSyncer(tctx, s.checkpoint.GlobalPoint())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this retry is occurred at sharding-resyc phase, enable safe-mode for one transaction is not enough because the start point may be reset to the very beginning point. But since safe-mode is enable in the full sharding-resync phase this is ok. Maybe we can add a comment about this here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does safe-mode is enable in the full sharding-resync phase means? To me this is a problem.

Copy link
Contributor Author

@lance6716 lance6716 Dec 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me

if err != nil {
return err
}
log.L().Info("reset replication binlog puller", zap.Any("pos", s.checkpoint.GlobalPoint()))
inFinerRetry = true
continue
}

Expand Down Expand Up @@ -1760,7 +1772,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
shardingReSync: shardingReSync,
closeShardingResync: closeShardingResync,
traceSource: traceSource,
safeMode: s.safeMode.Enable(),
safeMode: s.safeMode.Enable() || inFinerRetry,
tryReSync: tryReSync,
startTime: startTime,
shardingReSyncCh: &shardingReSyncCh,
Expand All @@ -1773,12 +1785,17 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
case *replication.RotateEvent:
err2 = s.handleRotateEvent(ev, ec)
case *replication.RowsEvent:
eventIndex++
metrics.BinlogEventRowHistogram.WithLabelValues(s.cfg.WorkerName, s.cfg.Name, s.cfg.SourceID).Observe(float64(len(ev.Rows)))
err2 = s.handleRowsEvent(ev, ec)
case *replication.QueryEvent:
originSQL = strings.TrimSpace(string(ev.Query))
err2 = s.handleQueryEvent(ev, ec, originSQL)
case *replication.XIDEvent:
eventIndex = 0
if inFinerRetry {
inFinerRetry = false
}
if shardingReSync != nil {
shardingReSync.currLocation.Position.Pos = e.Header.LogPos
shardingReSync.currLocation.Suffix = currentLocation.Suffix
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/_utils/check_sync_diff
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ cd $workdir
i=0
while [ $i -lt $check_time ]; do
rm -rf $OUTPUT_DIR
$binary --config=$conf >>$LOG 2>&1
$binary --config=$conf >$LOG 2>&1
ret=$?
if [ "$ret" == 0 ]; then
echo "check diff successfully"
Expand Down
46 changes: 0 additions & 46 deletions dm/tests/duplicate_event/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,53 +34,7 @@ function run() {
run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1

check_log_contain_with_retry "reset replication binlog puller" $WORK_DIR/worker1/log/dm-worker.log
check_log_contain_with_retry "discard event already consumed" $WORK_DIR/worker1/log/dm-worker.log
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

# 2. test relay log retry relay with GTID

# with a 5 rows insert txn: 1 * FormatDesc + 1 * PreviousGTID + 1 * GTID + 1 * BEGIN + 5 * (Table_map + Write_rows) + 1 * XID
# here we fail at the third write rows event, sync should retry and auto recover without any duplicate event
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailed=15*return(3);github.com/pingcap/tiflow/dm/relay/retry/RelayAllowRetry=return"

run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 2 rows affected'

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $cur/conf/dm-task-relay.yaml --remove-meta"
check_metric $WORKER2_PORT "dm_worker_task_state{source_id=\"mysql-replica-02\",task=\"test_relay\",worker=\"worker2\"}" 10 1 3

check_sync_diff $WORK_DIR $cur/conf/diff_relay_config.toml

run_sql_source2 "flush logs;"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-relay -s $SOURCE_ID2 worker2" \
"\"result\": true" 1
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -s $SOURCE_ID2" \
"\"relayCatchUpMaster\": true" 1

run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2

check_log_contain_with_retry "retrying to read binlog" $WORK_DIR/worker2/log/dm-worker.log
check_log_contain_with_retry "discard duplicate event" $WORK_DIR/worker2/log/dm-worker.log

check_sync_diff $WORK_DIR $cur/conf/diff_relay_config.toml

# check relay log binlog file size is the same as master size
run_sql_source2 "show master status;"
binlog_file=$(grep "File" $TEST_DIR/sql_res.$TEST_NAME.txt | awk -F: '{print $2}' | xargs)
binlog_pos=$(grep "Position" $TEST_DIR/sql_res.$TEST_NAME.txt | awk -F: '{print $2}' | xargs)

server_uuid=$(tail -n 1 $WORK_DIR/worker2/relay-dir/server-uuid.index)
relay_log_size=$(ls -al $WORK_DIR/worker2/relay-dir/$server_uuid/$binlog_file | awk '{print $5}')
[ "$binlog_pos" -eq "$relay_log_size" ]
}

# also cleanup dm processes in case of last run failed
Expand Down
1 change: 1 addition & 0 deletions dm/tests/others_integration_2.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ case_sensitive
sql_mode
http_proxies
openapi
duplicate_event
tracker_ignored_ddl