diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index 02e175ee140..b265f19423e 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -34,6 +34,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/pipeline" pmessage "github.com/pingcap/tiflow/pkg/pipeline/message" + "github.com/pingcap/tiflow/pkg/retry" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -183,15 +184,28 @@ func (n *sorterNode) start( } } - phy, logic, err := n.pdClient.GetTS(ctx) + var replicateTs uint64 + backoffBaseDelayInMs := int64(100) + totalRetryDuration := 10 * time.Second + start := time.Now() + err := retry.Do(stdCtx, func() error { + phy, logic, err := n.pdClient.GetTS(ctx) + if err != nil { + return errors.Trace(err) + } + replicateTs = oracle.ComposeTS(phy, logic) + return nil + }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), + retry.WithTotalRetryDuratoin(totalRetryDuration), + retry.WithIsRetryableErr(cerror.IsRetryableError)) if err != nil { return errors.Trace(err) } - replicateTs := oracle.ComposeTS(phy, logic) log.Info("table is replicating", zap.Int64("tableID", n.tableID), zap.String("tableName", n.tableName), zap.Uint64("replicateTs", replicateTs), + zap.Duration("duration", time.Since(start)), zap.String("namespace", n.changefeed.Namespace), zap.String("changefeed", n.changefeed.ID))