Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*/: add pd write frequency control from cdc server parameter #937

Merged
merged 6 commits into from
Sep 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ const (
captureSessionTTL = 3
)

// processorOpts records options for processor
type processorOpts struct {
flushCheckpointInterval time.Duration
}

// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.
type Capture struct {
etcdClient kv.CDCEtcdClient
Expand All @@ -51,10 +56,18 @@ type Capture struct {
// session keeps alive between the capture and etcd
session *concurrency.Session
election *concurrency.Election

opts *processorOpts
}

// NewCapture returns a new Capture instance
func NewCapture(ctx context.Context, pdEndpoints []string, credential *security.Credential, advertiseAddr string) (c *Capture, err error) {
func NewCapture(
ctx context.Context,
pdEndpoints []string,
credential *security.Credential,
advertiseAddr string,
opts *processorOpts,
) (c *Capture, err error) {
tlsConfig, err := credential.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -91,7 +104,7 @@ func NewCapture(ctx context.Context, pdEndpoints []string, credential *security.
return nil, errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session")
}
elec := concurrency.NewElection(sess, kv.CaptureOwnerKey)
cli := kv.NewCDCEtcdClient(etcdCli)
cli := kv.NewCDCEtcdClient(ctx, etcdCli)
id := uuid.New().String()
info := &model.CaptureInfo{
ID: id,
Expand All @@ -107,6 +120,7 @@ func NewCapture(ctx context.Context, pdEndpoints []string, credential *security.
session: sess,
election: elec,
info: info,
opts: opts,
}

return
Expand Down Expand Up @@ -224,7 +238,7 @@ func (c *Capture) assignTask(ctx context.Context, task *Task) (*processor, error
zap.String("changefeedid", task.ChangeFeedID))

p, err := runProcessor(
ctx, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS)
ctx, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, c.opts.flushCheckpointInterval)
if err != nil {
log.Error("run processor failed",
zap.String("changefeedid", task.ChangeFeedID),
Expand Down
15 changes: 13 additions & 2 deletions cdc/kv/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/ticdc/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/embed"
Expand Down Expand Up @@ -111,8 +113,17 @@ type CDCEtcdClient struct {
}

//NewCDCEtcdClient returns a new CDCEtcdClient
func NewCDCEtcdClient(cli *clientv3.Client) CDCEtcdClient {
return CDCEtcdClient{Client: etcd.Wrap(cli)}
func NewCDCEtcdClient(ctx context.Context, cli *clientv3.Client) CDCEtcdClient {
captureAddr := util.CaptureAddrFromCtx(ctx)
metrics := map[string]prometheus.Counter{
etcd.EtcdPut: etcdRequestCounter.WithLabelValues(etcd.EtcdPut, captureAddr),
etcd.EtcdGet: etcdRequestCounter.WithLabelValues(etcd.EtcdGet, captureAddr),
etcd.EtcdDel: etcdRequestCounter.WithLabelValues(etcd.EtcdDel, captureAddr),
etcd.EtcdTxn: etcdRequestCounter.WithLabelValues(etcd.EtcdTxn, captureAddr),
etcd.EtcdGrant: etcdRequestCounter.WithLabelValues(etcd.EtcdGrant, captureAddr),
etcd.EtcdRevoke: etcdRequestCounter.WithLabelValues(etcd.EtcdRevoke, captureAddr),
}
Comment on lines +118 to +125
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, it's better to put the map in package pkg/etcd, and pass capture address to the Wrap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the metrics has cdc specific label capture, so I think it is better to put the metrics map in the upper caller.

return CDCEtcdClient{Client: etcd.Wrap(cli, metrics)}
}

// ClearAllCDCInfo delete all keys created by CDC
Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *etcdSuite) SetUpTest(c *check.C) {
DialTimeout: 3 * time.Second,
})
c.Assert(err, check.IsNil)
s.client = NewCDCEtcdClient(client)
s.client = NewCDCEtcdClient(context.TODO(), client)
s.ctx, s.cancel = context.WithCancel(context.Background())
s.errg = util.HandleErrWithErrGroup(s.ctx, s.e.Err(), func(e error) { c.Log(e) })
}
Expand Down
8 changes: 8 additions & 0 deletions cdc/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ var (
Name: "channel_size",
Help: "size of each channel in kv client",
}, []string{"id", "channel"})
etcdRequestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "etcd",
Name: "request_count",
Help: "request counter of etcd operation",
}, []string{"type", "capture"})
)

// InitMetrics registers all metrics in the kv package
Expand All @@ -78,4 +85,5 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(pullEventCounter)
registry.MustRegister(sendEventCounter)
registry.MustRegister(clientChannelSize)
registry.MustRegister(etcdRequestCounter)
}
52 changes: 33 additions & 19 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ type Owner struct {
gcTTL int64
// last update gc safepoint time. zero time means has not updated or cleared
gcSafepointLastUpdate time.Time
// record last time that flushes all changefeeds' replication status
lastFlushChangefeeds time.Time
flushChangefeedInterval time.Duration
}

const (
Expand All @@ -90,25 +93,33 @@ const (
)

// NewOwner creates a new Owner instance
func NewOwner(pdClient pd.Client, credential *security.Credential, sess *concurrency.Session, gcTTL int64) (*Owner, error) {
cli := kv.NewCDCEtcdClient(sess.Client())
func NewOwner(
ctx context.Context,
pdClient pd.Client,
credential *security.Credential,
sess *concurrency.Session,
gcTTL int64,
flushChangefeedInterval time.Duration,
) (*Owner, error) {
cli := kv.NewCDCEtcdClient(ctx, sess.Client())
endpoints := sess.Client().Endpoints()

owner := &Owner{
done: make(chan struct{}),
session: sess,
pdClient: pdClient,
credential: credential,
changeFeeds: make(map[model.ChangeFeedID]*changeFeed),
failInitFeeds: make(map[model.ChangeFeedID]struct{}),
stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus),
captures: make(map[model.CaptureID]*model.CaptureInfo),
rebalanceTigger: make(map[model.ChangeFeedID]bool),
manualScheduleCommand: make(map[model.ChangeFeedID][]*model.MoveTableJob),
pdEndpoints: endpoints,
cfRWriter: cli,
etcdClient: cli,
gcTTL: gcTTL,
done: make(chan struct{}),
session: sess,
pdClient: pdClient,
credential: credential,
changeFeeds: make(map[model.ChangeFeedID]*changeFeed),
failInitFeeds: make(map[model.ChangeFeedID]struct{}),
stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus),
captures: make(map[model.CaptureID]*model.CaptureInfo),
rebalanceTigger: make(map[model.ChangeFeedID]bool),
manualScheduleCommand: make(map[model.ChangeFeedID][]*model.MoveTableJob),
pdEndpoints: endpoints,
cfRWriter: cli,
etcdClient: cli,
gcTTL: gcTTL,
flushChangefeedInterval: flushChangefeedInterval,
}

return owner, nil
Expand Down Expand Up @@ -569,9 +580,12 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
minCheckpointTs = changefeed.status.CheckpointTs
}
}
err := o.cfRWriter.PutAllChangeFeedStatus(ctx, snapshot)
if err != nil {
return errors.Trace(err)
if time.Since(o.lastFlushChangefeeds) > o.flushChangefeedInterval {
err := o.cfRWriter.PutAllChangeFeedStatus(ctx, snapshot)
if err != nil {
return errors.Trace(err)
}
o.lastFlushChangefeeds = time.Now()
}
}
for _, status := range o.stoppedFeeds {
Expand Down
7 changes: 4 additions & 3 deletions cdc/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (s *ownerSuite) SetUpTest(c *check.C) {
DialTimeout: 3 * time.Second,
})
c.Assert(err, check.IsNil)
s.client = kv.NewCDCEtcdClient(client)
s.client = kv.NewCDCEtcdClient(context.TODO(), client)
s.ctx, s.cancel = context.WithCancel(context.Background())
s.errg = util.HandleErrWithErrGroup(s.ctx, s.e.Err(), func(e error) { c.Log(e) })
}
Expand Down Expand Up @@ -465,12 +465,13 @@ func (s *ownerSuite) TestHandleAdmin(c *check.C) {
c.Assert(err, check.IsNil)
sampleCF.sink = sink

capture, err := NewCapture(ctx, []string{s.clientURL.String()}, &security.Credential{}, "127.0.0.1:12034")
capture, err := NewCapture(ctx, []string{s.clientURL.String()},
&security.Credential{}, "127.0.0.1:12034", &processorOpts{flushCheckpointInterval: time.Millisecond * 200})
c.Assert(err, check.IsNil)
err = capture.Campaign(ctx)
c.Assert(err, check.IsNil)

owner, err := NewOwner(nil, &security.Credential{}, capture.session, DefaultCDCGCSafePointTTL)
owner, err := NewOwner(ctx, nil, &security.Credential{}, capture.session, DefaultCDCGCSafePointTTL, time.Millisecond*200)
c.Assert(err, check.IsNil)

sampleCF.etcdCli = owner.etcdClient
Expand Down
33 changes: 17 additions & 16 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ const (
defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G

defaultSyncResolvedBatch = 1024

defaultFlushTaskPositionInterval = 200 * time.Millisecond
)

var (
Expand All @@ -86,10 +84,11 @@ type processor struct {

sink sink.Sink

sinkEmittedResolvedTs uint64
globalResolvedTs uint64
localResolvedTs uint64
checkpointTs uint64
sinkEmittedResolvedTs uint64
globalResolvedTs uint64
localResolvedTs uint64
checkpointTs uint64
flushCheckpointInterval time.Duration

ddlPuller puller.Puller
ddlPullerCancel context.CancelFunc
Expand Down Expand Up @@ -165,6 +164,7 @@ func newProcessor(
captureInfo model.CaptureInfo,
checkpointTs uint64,
errCh chan error,
flushCheckpointInterval time.Duration,
) (*processor, error) {
etcdCli := session.Client()
endpoints := session.Client().Endpoints()
Expand All @@ -177,7 +177,7 @@ func newProcessor(
if err != nil {
return nil, errors.Trace(err)
}
cdcEtcdCli := kv.NewCDCEtcdClient(etcdCli)
cdcEtcdCli := kv.NewCDCEtcdClient(ctx, etcdCli)
limitter := puller.NewBlurResourceLimmter(defaultMemBufferCapacity)

log.Info("start processor with startts", zap.Uint64("startts", checkpointTs))
Expand Down Expand Up @@ -396,11 +396,13 @@ func (p *processor) positionWorker(ctx context.Context) error {
// It is more accurate to get tso from PD, but in most cases we have
// deployed NTP service, a little bias is acceptable here.
metricResolvedTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-phyTs) / 1e3)

p.position.ResolvedTs = minResolvedTs
resolvedTsGauge.Set(float64(phyTs))
if err := retryFlushTaskStatusAndPosition(); err != nil {
return errors.Trace(err)

if p.position.ResolvedTs < minResolvedTs {
p.position.ResolvedTs = minResolvedTs
if err := retryFlushTaskStatusAndPosition(); err != nil {
return errors.Trace(err)
}
}
case <-p.localCheckpointTsReceiver.C:
checkpointTs := atomic.LoadUint64(&p.checkpointTs)
Expand All @@ -409,10 +411,7 @@ func (p *processor) positionWorker(ctx context.Context) error {
// deployed NTP service, a little bias is acceptable here.
metricCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-phyTs) / 1e3)

if p.position.CheckPointTs >= checkpointTs {
continue
}
if time.Since(lastFlushTime) < defaultFlushTaskPositionInterval {
if time.Since(lastFlushTime) < p.flushCheckpointInterval {
continue
}

Expand Down Expand Up @@ -1169,6 +1168,7 @@ func runProcessor(
changefeedID string,
captureInfo model.CaptureInfo,
checkpointTs uint64,
flushCheckpointInterval time.Duration,
) (*processor, error) {
opts := make(map[string]string, len(info.Opts)+2)
for k, v := range info.Opts {
Expand All @@ -1188,7 +1188,8 @@ func runProcessor(
cancel()
return nil, errors.Trace(err)
}
processor, err := newProcessor(ctx, credential, session, info, sink, changefeedID, captureInfo, checkpointTs, errCh)
processor, err := newProcessor(ctx, credential, session, info, sink,
changefeedID, captureInfo, checkpointTs, errCh, flushCheckpointInterval)
if err != nil {
cancel()
return nil, err
Expand Down
42 changes: 31 additions & 11 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ const (
)

type options struct {
pdEndpoints string
credential *security.Credential
addr string
advertiseAddr string
gcTTL int64
timezone *time.Location
pdEndpoints string
credential *security.Credential
addr string
advertiseAddr string
gcTTL int64
timezone *time.Location
ownerFlushInterval time.Duration
processorFlushInterval time.Duration
}

func (o *options) validateAndAdjust() error {
Expand Down Expand Up @@ -135,6 +137,20 @@ func Timezone(tz *time.Location) ServerOption {
}
}

// OwnerFlushInterval returns a ServerOption that sets the ownerFlushInterval
func OwnerFlushInterval(dur time.Duration) ServerOption {
return func(o *options) {
o.ownerFlushInterval = dur
}
}

// ProcessorFlushInterval returns a ServerOption that sets the processorFlushInterval
func ProcessorFlushInterval(dur time.Duration) ServerOption {
return func(o *options) {
o.processorFlushInterval = dur
}
}

// Credential returns a ServerOption that sets the TLS
func Credential(credential *security.Credential) ServerOption {
return func(o *options) {
Expand Down Expand Up @@ -170,7 +186,10 @@ func NewServer(opt ...ServerOption) (*Server, error) {
zap.String("address", opts.addr),
zap.String("advertise-address", opts.advertiseAddr),
zap.Int64("gc-ttl", opts.gcTTL),
zap.Any("timezone", opts.timezone))
zap.Any("timezone", opts.timezone),
zap.Duration("owner-flush-interval", opts.ownerFlushInterval),
zap.Duration("processor-flush-interval", opts.processorFlushInterval),
)

s := &Server{
opts: opts,
Expand Down Expand Up @@ -257,7 +276,7 @@ func (s *Server) campaignOwnerLoop(ctx context.Context) error {
continue
}
log.Info("campaign owner successfully", zap.String("capture", s.capture.info.ID))
owner, err := NewOwner(s.pdClient, s.opts.credential, s.capture.session, s.opts.gcTTL)
owner, err := NewOwner(ctx, s.pdClient, s.opts.credential, s.capture.session, s.opts.gcTTL, s.opts.ownerFlushInterval)
if err != nil {
log.Warn("create new owner failed", zap.Error(err))
continue
Expand All @@ -282,13 +301,14 @@ func (s *Server) campaignOwnerLoop(ctx context.Context) error {
}

func (s *Server) run(ctx context.Context) (err error) {
capture, err := NewCapture(ctx, s.pdEndpoints, s.opts.credential, s.opts.advertiseAddr)
ctx = util.PutCaptureAddrInCtx(ctx, s.opts.advertiseAddr)
ctx = util.PutTimezoneInCtx(ctx, s.opts.timezone)
procOpts := &processorOpts{flushCheckpointInterval: s.opts.processorFlushInterval}
capture, err := NewCapture(ctx, s.pdEndpoints, s.opts.credential, s.opts.advertiseAddr, procOpts)
if err != nil {
return err
}
s.capture = capture
ctx = util.PutCaptureAddrInCtx(ctx, s.capture.info.AdvertiseAddr)
ctx = util.PutTimezoneInCtx(ctx, s.opts.timezone)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down
Loading