From 4134c7ba5e386ccb1fcdfee5e0dfd16e3985cccb Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 18 Aug 2022 12:56:52 +0800 Subject: [PATCH] sink(ticdc): refine get tso (#6462) (#6789) close pingcap/tiflow#6460 --- cdc/processor/pipeline/sorter.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index 02e175ee140..b265f19423e 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -34,6 +34,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/pipeline" pmessage "github.com/pingcap/tiflow/pkg/pipeline/message" + "github.com/pingcap/tiflow/pkg/retry" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -183,15 +184,28 @@ func (n *sorterNode) start( } } - phy, logic, err := n.pdClient.GetTS(ctx) + var replicateTs uint64 + backoffBaseDelayInMs := int64(100) + totalRetryDuration := 10 * time.Second + start := time.Now() + err := retry.Do(stdCtx, func() error { + phy, logic, err := n.pdClient.GetTS(ctx) + if err != nil { + return errors.Trace(err) + } + replicateTs = oracle.ComposeTS(phy, logic) + return nil + }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), + retry.WithTotalRetryDuratoin(totalRetryDuration), + retry.WithIsRetryableErr(cerror.IsRetryableError)) if err != nil { return errors.Trace(err) } - replicateTs := oracle.ComposeTS(phy, logic) log.Info("table is replicating", zap.Int64("tableID", n.tableID), zap.String("tableName", n.tableName), zap.Uint64("replicateTs", replicateTs), + zap.Duration("duration", time.Since(start)), zap.String("namespace", n.changefeed.Namespace), zap.String("changefeed", n.changefeed.ID))