diff --git a/cdc/capture.go b/cdc/capture.go index 9382a29ff9f..177a8b7cafe 100644 --- a/cdc/capture.go +++ b/cdc/capture.go @@ -38,6 +38,11 @@ const ( captureSessionTTL = 3 ) +// processorOpts records options for processor +type processorOpts struct { + flushCheckpointInterval time.Duration +} + // Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it. type Capture struct { etcdClient kv.CDCEtcdClient @@ -51,10 +56,18 @@ type Capture struct { // session keeps alive between the capture and etcd session *concurrency.Session election *concurrency.Election + + opts *processorOpts } // NewCapture returns a new Capture instance -func NewCapture(ctx context.Context, pdEndpoints []string, credential *security.Credential, advertiseAddr string) (c *Capture, err error) { +func NewCapture( + ctx context.Context, + pdEndpoints []string, + credential *security.Credential, + advertiseAddr string, + opts *processorOpts, +) (c *Capture, err error) { tlsConfig, err := credential.ToTLSConfig() if err != nil { return nil, errors.Trace(err) @@ -91,7 +104,7 @@ func NewCapture(ctx context.Context, pdEndpoints []string, credential *security. return nil, errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session") } elec := concurrency.NewElection(sess, kv.CaptureOwnerKey) - cli := kv.NewCDCEtcdClient(etcdCli) + cli := kv.NewCDCEtcdClient(ctx, etcdCli) id := uuid.New().String() info := &model.CaptureInfo{ ID: id, @@ -107,6 +120,7 @@ func NewCapture(ctx context.Context, pdEndpoints []string, credential *security. session: sess, election: elec, info: info, + opts: opts, } return @@ -224,7 +238,7 @@ func (c *Capture) assignTask(ctx context.Context, task *Task) (*processor, error zap.String("changefeedid", task.ChangeFeedID)) p, err := runProcessor( - ctx, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS) + ctx, c.credential, c.session, *cf, task.ChangeFeedID, *c.info, task.CheckpointTS, c.opts.flushCheckpointInterval) if err != nil { log.Error("run processor failed", zap.String("changefeedid", task.ChangeFeedID), diff --git a/cdc/kv/etcd.go b/cdc/kv/etcd.go index 6c0b48e6093..5c80370e37b 100644 --- a/cdc/kv/etcd.go +++ b/cdc/kv/etcd.go @@ -24,6 +24,8 @@ import ( cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/etcd" "github.com/pingcap/ticdc/pkg/retry" + "github.com/pingcap/ticdc/pkg/util" + "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" "go.etcd.io/etcd/embed" @@ -111,8 +113,17 @@ type CDCEtcdClient struct { } //NewCDCEtcdClient returns a new CDCEtcdClient -func NewCDCEtcdClient(cli *clientv3.Client) CDCEtcdClient { - return CDCEtcdClient{Client: etcd.Wrap(cli)} +func NewCDCEtcdClient(ctx context.Context, cli *clientv3.Client) CDCEtcdClient { + captureAddr := util.CaptureAddrFromCtx(ctx) + metrics := map[string]prometheus.Counter{ + etcd.EtcdPut: etcdRequestCounter.WithLabelValues(etcd.EtcdPut, captureAddr), + etcd.EtcdGet: etcdRequestCounter.WithLabelValues(etcd.EtcdGet, captureAddr), + etcd.EtcdDel: etcdRequestCounter.WithLabelValues(etcd.EtcdDel, captureAddr), + etcd.EtcdTxn: etcdRequestCounter.WithLabelValues(etcd.EtcdTxn, captureAddr), + etcd.EtcdGrant: etcdRequestCounter.WithLabelValues(etcd.EtcdGrant, captureAddr), + etcd.EtcdRevoke: etcdRequestCounter.WithLabelValues(etcd.EtcdRevoke, captureAddr), + } + return CDCEtcdClient{Client: etcd.Wrap(cli, metrics)} } // ClearAllCDCInfo delete all keys created by CDC diff --git a/cdc/kv/etcd_test.go b/cdc/kv/etcd_test.go index cd51f7999df..87f44415a3d 100644 --- a/cdc/kv/etcd_test.go +++ b/cdc/kv/etcd_test.go @@ -54,7 +54,7 @@ func (s *etcdSuite) SetUpTest(c *check.C) { DialTimeout: 3 * time.Second, }) c.Assert(err, check.IsNil) - s.client = NewCDCEtcdClient(client) + s.client = NewCDCEtcdClient(context.TODO(), client) s.ctx, s.cancel = context.WithCancel(context.Background()) s.errg = util.HandleErrWithErrGroup(s.ctx, s.e.Err(), func(e error) { c.Log(e) }) } diff --git a/cdc/kv/metrics.go b/cdc/kv/metrics.go index 19ca3f80c3c..5c0854766b7 100644 --- a/cdc/kv/metrics.go +++ b/cdc/kv/metrics.go @@ -67,6 +67,13 @@ var ( Name: "channel_size", Help: "size of each channel in kv client", }, []string{"id", "channel"}) + etcdRequestCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "etcd", + Name: "request_count", + Help: "request counter of etcd operation", + }, []string{"type", "capture"}) ) // InitMetrics registers all metrics in the kv package @@ -78,4 +85,5 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(pullEventCounter) registry.MustRegister(sendEventCounter) registry.MustRegister(clientChannelSize) + registry.MustRegister(etcdRequestCounter) } diff --git a/cdc/owner.go b/cdc/owner.go index a590fba7542..719593a7dc6 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -80,6 +80,9 @@ type Owner struct { gcTTL int64 // last update gc safepoint time. zero time means has not updated or cleared gcSafepointLastUpdate time.Time + // record last time that flushes all changefeeds' replication status + lastFlushChangefeeds time.Time + flushChangefeedInterval time.Duration } const ( @@ -90,25 +93,33 @@ const ( ) // NewOwner creates a new Owner instance -func NewOwner(pdClient pd.Client, credential *security.Credential, sess *concurrency.Session, gcTTL int64) (*Owner, error) { - cli := kv.NewCDCEtcdClient(sess.Client()) +func NewOwner( + ctx context.Context, + pdClient pd.Client, + credential *security.Credential, + sess *concurrency.Session, + gcTTL int64, + flushChangefeedInterval time.Duration, +) (*Owner, error) { + cli := kv.NewCDCEtcdClient(ctx, sess.Client()) endpoints := sess.Client().Endpoints() owner := &Owner{ - done: make(chan struct{}), - session: sess, - pdClient: pdClient, - credential: credential, - changeFeeds: make(map[model.ChangeFeedID]*changeFeed), - failInitFeeds: make(map[model.ChangeFeedID]struct{}), - stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus), - captures: make(map[model.CaptureID]*model.CaptureInfo), - rebalanceTigger: make(map[model.ChangeFeedID]bool), - manualScheduleCommand: make(map[model.ChangeFeedID][]*model.MoveTableJob), - pdEndpoints: endpoints, - cfRWriter: cli, - etcdClient: cli, - gcTTL: gcTTL, + done: make(chan struct{}), + session: sess, + pdClient: pdClient, + credential: credential, + changeFeeds: make(map[model.ChangeFeedID]*changeFeed), + failInitFeeds: make(map[model.ChangeFeedID]struct{}), + stoppedFeeds: make(map[model.ChangeFeedID]*model.ChangeFeedStatus), + captures: make(map[model.CaptureID]*model.CaptureInfo), + rebalanceTigger: make(map[model.ChangeFeedID]bool), + manualScheduleCommand: make(map[model.ChangeFeedID][]*model.MoveTableJob), + pdEndpoints: endpoints, + cfRWriter: cli, + etcdClient: cli, + gcTTL: gcTTL, + flushChangefeedInterval: flushChangefeedInterval, } return owner, nil @@ -569,9 +580,12 @@ func (o *Owner) flushChangeFeedInfos(ctx context.Context) error { minCheckpointTs = changefeed.status.CheckpointTs } } - err := o.cfRWriter.PutAllChangeFeedStatus(ctx, snapshot) - if err != nil { - return errors.Trace(err) + if time.Since(o.lastFlushChangefeeds) > o.flushChangefeedInterval { + err := o.cfRWriter.PutAllChangeFeedStatus(ctx, snapshot) + if err != nil { + return errors.Trace(err) + } + o.lastFlushChangefeeds = time.Now() } } for _, status := range o.stoppedFeeds { diff --git a/cdc/owner_test.go b/cdc/owner_test.go index 26abc7609cc..679e5cda256 100644 --- a/cdc/owner_test.go +++ b/cdc/owner_test.go @@ -60,7 +60,7 @@ func (s *ownerSuite) SetUpTest(c *check.C) { DialTimeout: 3 * time.Second, }) c.Assert(err, check.IsNil) - s.client = kv.NewCDCEtcdClient(client) + s.client = kv.NewCDCEtcdClient(context.TODO(), client) s.ctx, s.cancel = context.WithCancel(context.Background()) s.errg = util.HandleErrWithErrGroup(s.ctx, s.e.Err(), func(e error) { c.Log(e) }) } @@ -465,12 +465,13 @@ func (s *ownerSuite) TestHandleAdmin(c *check.C) { c.Assert(err, check.IsNil) sampleCF.sink = sink - capture, err := NewCapture(ctx, []string{s.clientURL.String()}, &security.Credential{}, "127.0.0.1:12034") + capture, err := NewCapture(ctx, []string{s.clientURL.String()}, + &security.Credential{}, "127.0.0.1:12034", &processorOpts{flushCheckpointInterval: time.Millisecond * 200}) c.Assert(err, check.IsNil) err = capture.Campaign(ctx) c.Assert(err, check.IsNil) - owner, err := NewOwner(nil, &security.Credential{}, capture.session, DefaultCDCGCSafePointTTL) + owner, err := NewOwner(ctx, nil, &security.Credential{}, capture.session, DefaultCDCGCSafePointTTL, time.Millisecond*200) c.Assert(err, check.IsNil) sampleCF.etcdCli = owner.etcdClient diff --git a/cdc/processor.go b/cdc/processor.go index e24c5f7cfab..b7ffff1e389 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -62,8 +62,6 @@ const ( defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G defaultSyncResolvedBatch = 1024 - - defaultFlushTaskPositionInterval = 200 * time.Millisecond ) var ( @@ -86,10 +84,11 @@ type processor struct { sink sink.Sink - sinkEmittedResolvedTs uint64 - globalResolvedTs uint64 - localResolvedTs uint64 - checkpointTs uint64 + sinkEmittedResolvedTs uint64 + globalResolvedTs uint64 + localResolvedTs uint64 + checkpointTs uint64 + flushCheckpointInterval time.Duration ddlPuller puller.Puller ddlPullerCancel context.CancelFunc @@ -165,6 +164,7 @@ func newProcessor( captureInfo model.CaptureInfo, checkpointTs uint64, errCh chan error, + flushCheckpointInterval time.Duration, ) (*processor, error) { etcdCli := session.Client() endpoints := session.Client().Endpoints() @@ -177,7 +177,7 @@ func newProcessor( if err != nil { return nil, errors.Trace(err) } - cdcEtcdCli := kv.NewCDCEtcdClient(etcdCli) + cdcEtcdCli := kv.NewCDCEtcdClient(ctx, etcdCli) limitter := puller.NewBlurResourceLimmter(defaultMemBufferCapacity) log.Info("start processor with startts", zap.Uint64("startts", checkpointTs)) @@ -396,11 +396,13 @@ func (p *processor) positionWorker(ctx context.Context) error { // It is more accurate to get tso from PD, but in most cases we have // deployed NTP service, a little bias is acceptable here. metricResolvedTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-phyTs) / 1e3) - - p.position.ResolvedTs = minResolvedTs resolvedTsGauge.Set(float64(phyTs)) - if err := retryFlushTaskStatusAndPosition(); err != nil { - return errors.Trace(err) + + if p.position.ResolvedTs < minResolvedTs { + p.position.ResolvedTs = minResolvedTs + if err := retryFlushTaskStatusAndPosition(); err != nil { + return errors.Trace(err) + } } case <-p.localCheckpointTsReceiver.C: checkpointTs := atomic.LoadUint64(&p.checkpointTs) @@ -409,10 +411,7 @@ func (p *processor) positionWorker(ctx context.Context) error { // deployed NTP service, a little bias is acceptable here. metricCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-phyTs) / 1e3) - if p.position.CheckPointTs >= checkpointTs { - continue - } - if time.Since(lastFlushTime) < defaultFlushTaskPositionInterval { + if time.Since(lastFlushTime) < p.flushCheckpointInterval { continue } @@ -1169,6 +1168,7 @@ func runProcessor( changefeedID string, captureInfo model.CaptureInfo, checkpointTs uint64, + flushCheckpointInterval time.Duration, ) (*processor, error) { opts := make(map[string]string, len(info.Opts)+2) for k, v := range info.Opts { @@ -1188,7 +1188,8 @@ func runProcessor( cancel() return nil, errors.Trace(err) } - processor, err := newProcessor(ctx, credential, session, info, sink, changefeedID, captureInfo, checkpointTs, errCh) + processor, err := newProcessor(ctx, credential, session, info, sink, + changefeedID, captureInfo, checkpointTs, errCh, flushCheckpointInterval) if err != nil { cancel() return nil, err diff --git a/cdc/server.go b/cdc/server.go index 55c8b74fa4f..a6c079f9a40 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -44,12 +44,14 @@ const ( ) type options struct { - pdEndpoints string - credential *security.Credential - addr string - advertiseAddr string - gcTTL int64 - timezone *time.Location + pdEndpoints string + credential *security.Credential + addr string + advertiseAddr string + gcTTL int64 + timezone *time.Location + ownerFlushInterval time.Duration + processorFlushInterval time.Duration } func (o *options) validateAndAdjust() error { @@ -135,6 +137,20 @@ func Timezone(tz *time.Location) ServerOption { } } +// OwnerFlushInterval returns a ServerOption that sets the ownerFlushInterval +func OwnerFlushInterval(dur time.Duration) ServerOption { + return func(o *options) { + o.ownerFlushInterval = dur + } +} + +// ProcessorFlushInterval returns a ServerOption that sets the processorFlushInterval +func ProcessorFlushInterval(dur time.Duration) ServerOption { + return func(o *options) { + o.processorFlushInterval = dur + } +} + // Credential returns a ServerOption that sets the TLS func Credential(credential *security.Credential) ServerOption { return func(o *options) { @@ -170,7 +186,10 @@ func NewServer(opt ...ServerOption) (*Server, error) { zap.String("address", opts.addr), zap.String("advertise-address", opts.advertiseAddr), zap.Int64("gc-ttl", opts.gcTTL), - zap.Any("timezone", opts.timezone)) + zap.Any("timezone", opts.timezone), + zap.Duration("owner-flush-interval", opts.ownerFlushInterval), + zap.Duration("processor-flush-interval", opts.processorFlushInterval), + ) s := &Server{ opts: opts, @@ -257,7 +276,7 @@ func (s *Server) campaignOwnerLoop(ctx context.Context) error { continue } log.Info("campaign owner successfully", zap.String("capture", s.capture.info.ID)) - owner, err := NewOwner(s.pdClient, s.opts.credential, s.capture.session, s.opts.gcTTL) + owner, err := NewOwner(ctx, s.pdClient, s.opts.credential, s.capture.session, s.opts.gcTTL, s.opts.ownerFlushInterval) if err != nil { log.Warn("create new owner failed", zap.Error(err)) continue @@ -282,13 +301,14 @@ func (s *Server) campaignOwnerLoop(ctx context.Context) error { } func (s *Server) run(ctx context.Context) (err error) { - capture, err := NewCapture(ctx, s.pdEndpoints, s.opts.credential, s.opts.advertiseAddr) + ctx = util.PutCaptureAddrInCtx(ctx, s.opts.advertiseAddr) + ctx = util.PutTimezoneInCtx(ctx, s.opts.timezone) + procOpts := &processorOpts{flushCheckpointInterval: s.opts.processorFlushInterval} + capture, err := NewCapture(ctx, s.pdEndpoints, s.opts.credential, s.opts.advertiseAddr, procOpts) if err != nil { return err } s.capture = capture - ctx = util.PutCaptureAddrInCtx(ctx, s.capture.info.AdvertiseAddr) - ctx = util.PutTimezoneInCtx(ctx, s.opts.timezone) ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/cdc/task_test.go b/cdc/task_test.go index 8243b4459c8..50d9438390d 100644 --- a/cdc/task_test.go +++ b/cdc/task_test.go @@ -49,7 +49,7 @@ func (s *taskSuite) SetUpTest(c *check.C) { // Create a task watcher capture := &Capture{ - etcdClient: kv.NewCDCEtcdClient(client), + etcdClient: kv.NewCDCEtcdClient(context.TODO(), client), processors: make(map[string]*processor), info: &model.CaptureInfo{ID: "task-suite-capture", AdvertiseAddr: "task-suite-addr"}, } @@ -75,7 +75,7 @@ func (s *taskSuite) TestNewTaskWatcher(c *check.C) { // initialize the PD service witch does not support to // be embeded. capture := &Capture{ - etcdClient: kv.NewCDCEtcdClient(s.c), + etcdClient: kv.NewCDCEtcdClient(context.TODO(), s.c), processors: make(map[string]*processor), info: &model.CaptureInfo{ID: "task-suite-capture", AdvertiseAddr: "task-suite-addr"}, } @@ -87,7 +87,7 @@ func (s *taskSuite) TestNewTaskWatcher(c *check.C) { } func (s *taskSuite) setupFeedInfo(c *check.C, changeFeedID string) { - client := kv.NewCDCEtcdClient(s.c) + client := kv.NewCDCEtcdClient(context.TODO(), s.c) // Create the change feed c.Assert(client.SaveChangeFeedInfo(s.c.Ctx(), &model.ChangeFeedInfo{ SinkURI: "mysql://fake", @@ -152,7 +152,7 @@ func (s *taskSuite) TestWatch(c *check.C) { s.setupFeedInfo(c, "changefeed-1") defer s.teardownFeedInfo(c, "changefeed-1") - client := kv.NewCDCEtcdClient(s.c) + client := kv.NewCDCEtcdClient(context.TODO(), s.c) // Watch with a canceled context failedCtx, cancel := context.WithCancel(context.Background()) cancel() diff --git a/cmd/client.go b/cmd/client.go index f8464bebdb6..fb41665cc26 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -165,7 +165,7 @@ func newCliCommand() *cobra.Command { // PD embeds an etcd server. return errors.Annotate(err, "fail to open PD etcd client") } - cdcEtcdCli = kv.NewCDCEtcdClient(etcdCli) + cdcEtcdCli = kv.NewCDCEtcdClient(defaultContext, etcdCli) pdCli, err = pd.NewClientWithContext( defaultContext, pdEndpoints, credential.PDSecurityOption(), pd.WithGRPCDialOptions( diff --git a/cmd/server.go b/cmd/server.go index 0de7f00a188..1bd61970099 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -15,6 +15,7 @@ package cmd import ( "context" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -34,6 +35,9 @@ var ( logFile string logLevel string + ownerFlushInterval time.Duration + processorFlushInterval time.Duration + serverCmd = &cobra.Command{ Use: "server", Short: "Start a TiCDC capture server", @@ -51,6 +55,8 @@ func init() { serverCmd.Flags().Int64Var(&gcTTL, "gc-ttl", cdc.DefaultCDCGCSafePointTTL, "CDC GC safepoint TTL duration, specified in seconds") serverCmd.Flags().StringVar(&logFile, "log-file", "", "log file path") serverCmd.Flags().StringVar(&logLevel, "log-level", "info", "log level (etc: debug|info|warn|error)") + serverCmd.Flags().DurationVar(&ownerFlushInterval, "owner-flush-interval", time.Millisecond*200, "owner flushes changefeed status interval") + serverCmd.Flags().DurationVar(&processorFlushInterval, "processor-flush-interval", time.Millisecond*100, "processor flushes task status interval") addSecurityFlags(serverCmd.Flags(), true /* isServer */) } @@ -72,7 +78,10 @@ func runEServer(cmd *cobra.Command, args []string) error { cdc.AdvertiseAddress(advertiseAddr), cdc.GCTTL(gcTTL), cdc.Timezone(tz), - cdc.Credential(getCredential())} + cdc.Credential(getCredential()), + cdc.OwnerFlushInterval(ownerFlushInterval), + cdc.ProcessorFlushInterval(processorFlushInterval), + } server, err := cdc.NewServer(opts...) if err != nil { return errors.Annotate(err, "new server") diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index b19d9b8bfa1..d527a6d4fc0 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -4952,6 +4952,103 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "request count of etcd operation", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 121 + }, + "hiddenSeries": false, + "id": 97, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "dataLinks": [] + }, + "paceLength": 10, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(ticdc_etcd_request_count{capture=~\"$capture\"}[1m])) by (instance, type)", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{instance}}-{{type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Etcd Request Count By Instance", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "title": "Events", @@ -5065,4 +5162,4 @@ "title": "Test-Cluster-CDC", "uid": "YiGL8hBZ1", "version": 7 -} \ No newline at end of file +} diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index f9fc11a9ae9..6a4c5e7c1fa 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -20,18 +20,30 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/retry" + "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" ) +// etcd operation names +const ( + EtcdPut = "Put" + EtcdGet = "Get" + EtcdTxn = "Txn" + EtcdDel = "Del" + EtcdGrant = "Grant" + EtcdRevoke = "Revoke" +) + // Client is a simple wrapper that adds retry to etcd RPC type Client struct { - cli *clientv3.Client + cli *clientv3.Client + metrics map[string]prometheus.Counter } // Wrap warps a clientv3.Client that provides etcd APIs required by TiCDC. -func Wrap(cli *clientv3.Client) *Client { - return &Client{cli: cli} +func Wrap(cli *clientv3.Client, metrics map[string]prometheus.Counter) *Client { + return &Client{cli: cli, metrics: metrics} } // Unwrap returns a clientv3.Client @@ -39,7 +51,7 @@ func (c *Client) Unwrap() *clientv3.Client { return c.cli } -func retryRPC(rpcName string, etcdRPC func() error) error { +func retryRPC(rpcName string, metric prometheus.Counter, etcdRPC func() error) error { // By default, PD etcd sets [3s, 6s) for election timeout. // Some rpc could fail due to etcd errors, like "proposal dropped". // Retry at least two election timeout to handle the case that two PDs restarted @@ -51,13 +63,16 @@ func retryRPC(rpcName string, etcdRPC func() error) error { if err != nil && errors.Cause(err) != context.Canceled { log.Warn("etcd RPC failed", zap.String("RPC", rpcName), zap.Error(err)) } + if metric != nil { + metric.Inc() + } return err }) } // Put delegates request to clientv3.KV.Put func (c *Client) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (resp *clientv3.PutResponse, err error) { - err = retryRPC("Put", func() error { + err = retryRPC(EtcdPut, c.metrics[EtcdPut], func() error { var inErr error resp, inErr = c.cli.Put(ctx, key, val, opts...) return inErr @@ -67,7 +82,7 @@ func (c *Client) Put(ctx context.Context, key, val string, opts ...clientv3.OpOp // Get delegates request to clientv3.KV.Get func (c *Client) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (resp *clientv3.GetResponse, err error) { - err = retryRPC("Get", func() error { + err = retryRPC(EtcdGet, c.metrics[EtcdGet], func() error { var inErr error resp, inErr = c.cli.Get(ctx, key, opts...) return inErr @@ -77,18 +92,24 @@ func (c *Client) Get(ctx context.Context, key string, opts ...clientv3.OpOption) // Delete delegates request to clientv3.KV.Delete func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (resp *clientv3.DeleteResponse, err error) { + if metric, ok := c.metrics[EtcdTxn]; ok { + metric.Inc() + } // We don't retry on delete operatoin. It's dangerous. return c.cli.Delete(ctx, key, opts...) } // Txn delegates request to clientv3.KV.Txn func (c *Client) Txn(ctx context.Context) clientv3.Txn { + if metric, ok := c.metrics[EtcdTxn]; ok { + metric.Inc() + } return c.cli.Txn(ctx) } // Grant delegates request to clientv3.Lease.Grant func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientv3.LeaseGrantResponse, err error) { - err = retryRPC("Grant", func() error { + err = retryRPC(EtcdGrant, c.metrics[EtcdGrant], func() error { var inErr error resp, inErr = c.cli.Grant(ctx, ttl) return inErr @@ -98,7 +119,7 @@ func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientv3.LeaseGran // Revoke delegates request to clientv3.Lease.Revoke func (c *Client) Revoke(ctx context.Context, id clientv3.LeaseID) (resp *clientv3.LeaseRevokeResponse, err error) { - err = retryRPC("Revoke", func() error { + err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error { var inErr error resp, inErr = c.cli.Revoke(ctx, id) return inErr diff --git a/pkg/etcd/client_test.go b/pkg/etcd/client_test.go index 9bfac08fd74..d7df6e7c0af 100644 --- a/pkg/etcd/client_test.go +++ b/pkg/etcd/client_test.go @@ -46,7 +46,7 @@ func (m *mockClient) Put(ctx context.Context, key, val string, opts ...clientv3. func (s *clientSuite) TestRetry(c *check.C) { cli := clientv3.NewCtxClient(context.TODO()) cli.KV = &mockClient{} - retrycli := Wrap(cli) + retrycli := Wrap(cli, nil) get, err := retrycli.Get(context.TODO(), "") c.Assert(err, check.IsNil) c.Assert(get, check.NotNil)