Skip to content

Commit

Permalink
sink(ticdc): refine get tso (#6462) (#6789)
Browse files Browse the repository at this point in the history
close #6460
  • Loading branch information
ti-chi-bot committed Aug 18, 2022
1 parent 7faae9f commit 4134c7b
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pipeline"
pmessage "github.com/pingcap/tiflow/pkg/pipeline/message"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -183,15 +184,28 @@ func (n *sorterNode) start(
}
}

phy, logic, err := n.pdClient.GetTS(ctx)
var replicateTs uint64
backoffBaseDelayInMs := int64(100)
totalRetryDuration := 10 * time.Second
start := time.Now()
err := retry.Do(stdCtx, func() error {
phy, logic, err := n.pdClient.GetTS(ctx)
if err != nil {
return errors.Trace(err)
}
replicateTs = oracle.ComposeTS(phy, logic)
return nil
}, retry.WithBackoffBaseDelay(backoffBaseDelayInMs),
retry.WithTotalRetryDuratoin(totalRetryDuration),
retry.WithIsRetryableErr(cerror.IsRetryableError))
if err != nil {
return errors.Trace(err)
}
replicateTs := oracle.ComposeTS(phy, logic)
log.Info("table is replicating",
zap.Int64("tableID", n.tableID),
zap.String("tableName", n.tableName),
zap.Uint64("replicateTs", replicateTs),
zap.Duration("duration", time.Since(start)),
zap.String("namespace", n.changefeed.Namespace),
zap.String("changefeed", n.changefeed.ID))

Expand Down

0 comments on commit 4134c7b

Please sign in to comment.