Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix changefeed checkpoint lag negative value error (#3013) #3536

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
pd "github.com/tikv/pd/client"
Expand All @@ -55,10 +56,11 @@ type Capture struct {
session *concurrency.Session
election *concurrency.Election

pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool
pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool
TimeAcquirer pdtime.TimeAcquirer

cancel context.CancelFunc

Expand Down Expand Up @@ -100,6 +102,12 @@ func (c *Capture) reset(ctx context.Context) error {
}
c.session = sess
c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey)

if c.TimeAcquirer != nil {
c.TimeAcquirer.Stop()
}
c.TimeAcquirer = pdtime.NewTimeAcquirer(c.pdClient)

if c.grpcPool != nil {
c.grpcPool.Close()
}
Expand Down Expand Up @@ -148,11 +156,12 @@ func (c *Capture) Run(ctx context.Context) error {

func (c *Capture) run(stdCtx context.Context) error {
ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
TimeAcquirer: c.TimeAcquirer,
})
err := c.register(ctx)
if err != nil {
Expand All @@ -166,7 +175,7 @@ func (c *Capture) run(stdCtx context.Context) error {
cancel()
}()
wg := new(sync.WaitGroup)
wg.Add(3)
wg.Add(4)
var ownerErr, processorErr error
go func() {
defer wg.Done()
Expand All @@ -188,6 +197,10 @@ func (c *Capture) run(stdCtx context.Context) error {
processorErr = c.runEtcdWorker(ctx, c.processorManager, orchestrator.NewGlobalState(), processorFlushInterval)
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
go func() {
defer wg.Done()
c.TimeAcquirer.Run(ctx)
}()
go func() {
defer wg.Done()
c.grpcPool.RecycleConn(ctx)
Expand Down
13 changes: 6 additions & 7 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package owner
import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -184,7 +183,9 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
return errors.Trace(err)
}
if shouldUpdateState {
c.updateStatus(barrierTs)
pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached()
currentTs := oracle.GetPhysical(pdTime)
c.updateStatus(currentTs, barrierTs)
}
return nil
}
Expand Down Expand Up @@ -464,7 +465,7 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
return done, nil
}

func (c *changefeed) updateStatus(barrierTs model.Ts) {
func (c *changefeed) updateStatus(currentTs int64, barrierTs model.Ts) {
resolvedTs := barrierTs
for _, position := range c.state.TaskPositions {
if resolvedTs > position.ResolvedTs {
Expand Down Expand Up @@ -496,12 +497,10 @@ func (c *changefeed) updateStatus(barrierTs model.Ts) {
}
return status, changed, nil
})

phyTs := oracle.ExtractPhysical(checkpointTs)

c.metricsChangefeedCheckpointTsGauge.Set(float64(phyTs))
// It is more accurate to get tso from PD, but in most cases since we have
// deployed NTP service, a little bias is acceptable here.
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-phyTs) / 1e3)
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyTs) / 1e3)
}

func (c *changefeed) Close(ctx context.Context) {
Expand Down
2 changes: 2 additions & 0 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/ticdc/pkg/version"
Expand Down Expand Up @@ -216,6 +217,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
TimeAcquirer: pdtime.NewTimeAcquirer4Test(),
})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test",
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (m *Manager) Tick(stdCtx context.Context, state orchestrator.ReactorState)
if err := m.handleCommand(); err != nil {
return state, err
}

captureID := ctx.GlobalVars().CaptureInfo.ID
var inactiveChangefeedCount int
for changefeedID, changefeedState := range globalState.Changefeeds {
Expand Down
27 changes: 14 additions & 13 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR
if !p.checkChangefeedNormal() {
return nil, cerror.ErrAdminStopProcessor.GenWithStackByArgs()
}
if skip := p.checkPosition(); skip {
// we should skip this tick after create a task position
if p.createTaskPosition() {
return p.changefeed, nil
}
if err := p.handleErrorCh(ctx); err != nil {
Expand All @@ -177,7 +178,11 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR
if err := p.flushRedoLogMeta(ctx); err != nil {
return nil, err
}
p.handlePosition()
// it is no need to check the err here, because we will use
// local time when an error return, which is acceptable
pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached()

p.handlePosition(oracle.GetPhysical(pdTime))
p.pushResolvedTs2Table()
p.handleWorkload()
p.doGCSchemaStorage(ctx)
Expand All @@ -195,10 +200,10 @@ func (p *processor) checkChangefeedNormal() bool {
return true
}

// checkPosition create a new task position, and put it into the etcd state.
// task position maybe be not exist only when the processor is running first time.
func (p *processor) checkPosition() (skipThisTick bool) {
if p.changefeed.TaskPositions[p.captureInfo.ID] != nil {
// createTaskPosition will create a new task position if a task position does not exist.
// task position not exist only when the processor is running first in the first tick.
func (p *processor) createTaskPosition() (skipThisTick bool) {
if _, exist := p.changefeed.TaskPositions[p.captureInfo.ID]; exist {
return false
}
if p.initialized {
Expand Down Expand Up @@ -559,7 +564,7 @@ func (p *processor) checkTablesNum(ctx cdcContext.Context) error {
}

// handlePosition calculates the local resolved ts and local checkpoint ts
func (p *processor) handlePosition() {
func (p *processor) handlePosition(currentTs int64) {
minResolvedTs := uint64(math.MaxUint64)
if p.schemaStorage != nil {
minResolvedTs = p.schemaStorage.ResolvedTs()
Expand All @@ -580,15 +585,11 @@ func (p *processor) handlePosition() {
}

resolvedPhyTs := oracle.ExtractPhysical(minResolvedTs)
// It is more accurate to get tso from PD, but in most cases we have
// deployed NTP service, a little bias is acceptable here.
p.metricResolvedTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-resolvedPhyTs) / 1e3)
p.metricResolvedTsLagGauge.Set(float64(currentTs-resolvedPhyTs) / 1e3)
p.metricResolvedTsGauge.Set(float64(resolvedPhyTs))

checkpointPhyTs := oracle.ExtractPhysical(minCheckpointTs)
// It is more accurate to get tso from PD, but in most cases we have
// deployed NTP service, a little bias is acceptable here.
p.metricCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-checkpointPhyTs) / 1e3)
p.metricCheckpointTsLagGauge.Set(float64(currentTs-checkpointPhyTs) / 1e3)
p.metricCheckpointTsGauge.Set(float64(checkpointPhyTs))

// minResolvedTs and minCheckpointTs may less than global resolved ts and global checkpoint ts when a new table added, the startTs of the new table is less than global checkpoint ts.
Expand Down
13 changes: 8 additions & 5 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/oracle"
Expand All @@ -33,11 +34,12 @@ import (
// the lifecycle of vars in the GlobalVars should be aligned with the ticdc server process.
// All field in Vars should be READ-ONLY and THREAD-SAFE
type GlobalVars struct {
PDClient pd.Client
KVStorage tidbkv.Storage
CaptureInfo *model.CaptureInfo
EtcdClient *etcd.CDCEtcdClient
GrpcPool kv.GrpcPool
PDClient pd.Client
KVStorage tidbkv.Storage
CaptureInfo *model.CaptureInfo
EtcdClient *etcd.CDCEtcdClient
GrpcPool kv.GrpcPool
TimeAcquirer pdtime.TimeAcquirer
}

// ChangefeedVars contains some vars which can be used anywhere in a pipeline
Expand Down Expand Up @@ -184,6 +186,7 @@ func NewBackendContext4Test(withChangefeedVars bool) Context {
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
TimeAcquirer: pdtime.NewTimeAcquirer4Test(),
})
if withChangefeedVars {
ctx = WithChangefeedVars(ctx, &ChangefeedVars{
Expand Down
6 changes: 4 additions & 2 deletions pkg/orchestrator/etcd_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"regexp"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -223,7 +224,6 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) {
defer func() {
_ = cli.Unwrap().Close()
}()

_, err := cli.Put(ctx, testEtcdKeyPrefix+"/sum", "0")
c.Check(err, check.IsNil)

Expand Down Expand Up @@ -272,7 +272,9 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) {
}

err = errg.Wait()
if err != nil && (errors.Cause(err) == context.DeadlineExceeded || errors.Cause(err) == context.Canceled) {
if err != nil && (errors.Cause(err) == context.DeadlineExceeded ||
errors.Cause(err) == context.Canceled ||
strings.Contains(err.Error(), "etcdserver: request timeout")) {
return
}
c.Check(err, check.IsNil)
Expand Down
118 changes: 118 additions & 0 deletions pkg/pdtime/acquirer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package pdtime

import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

const pdTimeUpdateInterval = 200 * time.Millisecond

type TimeAcquirer interface {
// Run run the TimeAcquirer
Run(ctx context.Context)
// CurrentTimeFromCached returns current time from cache
CurrentTimeFromCached() (time.Time, error)
// Stop stops the TimeAcquirer
Stop()
}

// TimeAcquirerImpl cache time get from PD periodically and cache it
type TimeAcquirerImpl struct {
pdClient pd.Client
timeCache time.Time
mu sync.RWMutex
cancel context.CancelFunc
err error
}

// NewTimeAcquirer return a new TimeAcquirer
func NewTimeAcquirer(pdClient pd.Client) TimeAcquirer {
return &TimeAcquirerImpl{
pdClient: pdClient,
}
}

// Run will get time from pd periodically to cache in pdPhysicalTimeCache
func (c *TimeAcquirerImpl) Run(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
c.cancel = cancel
ticker := time.NewTicker(pdTimeUpdateInterval)
for {
select {
// c.Stop() was called or parent ctx was canceled
case <-ctx.Done():
log.Info("TimeAcquirer exit")
return
case <-ticker.C:
err := retry.Do(ctx, func() error {
physical, _, err := c.pdClient.GetTS(ctx)
if err != nil {
log.Info("get time from pd failed, retry later", zap.Error(err))
return err
}
c.mu.Lock()
c.timeCache = oracle.GetTimeFromTS(oracle.ComposeTS(physical, 0))
c.err = nil
c.mu.Unlock()
return nil
}, retry.WithBackoffBaseDelay(200), retry.WithMaxTries(10))
if err != nil {
log.Warn("get time from pd failed, will use local time as pd time")
c.mu.Lock()
c.timeCache = time.Now()
c.err = err
c.mu.Unlock()
}
}
}
}

// CurrentTimeFromCached return current time from pd cache
func (c *TimeAcquirerImpl) CurrentTimeFromCached() (time.Time, error) {
c.mu.RLock()
err := c.err
cacheTime := c.timeCache
c.mu.RUnlock()
return cacheTime, errors.Trace(err)
}

func (c *TimeAcquirerImpl) Stop() {
c.cancel()
}

type TimeAcquirer4Test struct{}

func NewTimeAcquirer4Test() TimeAcquirer {
return &TimeAcquirer4Test{}
}

func (c *TimeAcquirer4Test) CurrentTimeFromCached() (time.Time, error) {
return time.Now(), nil
}

func (c *TimeAcquirer4Test) Run(ctx context.Context) {
}

func (c *TimeAcquirer4Test) Stop() {
}
Loading