Skip to content

Commit

Permalink
update minimal checkpointTS as gc safepoint to PD (#487)
Browse files Browse the repository at this point in the history
Signed-off-by: crazycs <crazycs520@gmail.com>
  • Loading branch information
crazycs520 authored Apr 27, 2020
1 parent 614bbc2 commit 8b17582
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 6 deletions.
27 changes: 25 additions & 2 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"math"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -57,10 +58,15 @@ type Owner struct {
adminJobsLock sync.Mutex

stepDown func(ctx context.Context) error

// gcTTL is the ttl of cdc gc safepoint ttl.
gcTTL int64
}

const cdcServiceSafePointID = "ticdc"

// NewOwner creates a new Owner instance
func NewOwner(sess *concurrency.Session) (*Owner, error) {
func NewOwner(sess *concurrency.Session, gcTTL int64) (*Owner, error) {
cli := kv.NewCDCEtcdClient(sess.Client())
endpoints := sess.Client().Endpoints()
pdClient, err := pd.NewClient(endpoints, pd.SecurityOption{})
Expand All @@ -77,6 +83,7 @@ func NewOwner(sess *concurrency.Session) (*Owner, error) {
pdEndpoints: endpoints,
cfRWriter: cli,
etcdClient: cli,
gcTTL: gcTTL,
}

return owner, nil
Expand Down Expand Up @@ -298,11 +305,27 @@ func (o *Owner) balanceTables(ctx context.Context) error {
}

func (o *Owner) flushChangeFeedInfos(ctx context.Context) error {
if len(o.changeFeeds) == 0 {
return nil
}
snapshot := make(map[model.ChangeFeedID]*model.ChangeFeedStatus, len(o.changeFeeds))
minCheckpointTs := uint64(math.MaxUint64)
for id, changefeed := range o.changeFeeds {
snapshot[id] = changefeed.status
if changefeed.status.CheckpointTs < minCheckpointTs {
minCheckpointTs = changefeed.status.CheckpointTs
}
}
return errors.Trace(o.cfRWriter.PutAllChangeFeedStatus(ctx, snapshot))
err := o.cfRWriter.PutAllChangeFeedStatus(ctx, snapshot)
if err != nil {
return errors.Trace(err)
}
_, err = o.pdClient.UpdateServiceGCSafePoint(ctx, cdcServiceSafePointID, o.gcTTL, minCheckpointTs)
if err != nil {
log.Info("failed to update service safe point", zap.Error(err))
return errors.Trace(err)
}
return nil
}

// calcResolvedTs call calcResolvedTs of every changefeeds
Expand Down
21 changes: 18 additions & 3 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,25 @@ import (
"go.uber.org/zap"
)

const ownerRunInterval = time.Millisecond * 500
const (
ownerRunInterval = time.Millisecond * 500

// DefaultCDCGCSafePointTTL is the default value of cdc gc safe-point ttl, specified in seconds.
DefaultCDCGCSafePointTTL = 24 * 60 * 60
)

type options struct {
pdEndpoints string
statusHost string
statusPort int
gcTTL int64
}

var defaultServerOptions = options{
pdEndpoints: "http://127.0.0.1:2379",
statusHost: "127.0.0.1",
statusPort: defaultStatusPort,
gcTTL: DefaultCDCGCSafePointTTL,
}

// PDEndpoints returns a ServerOption that sets the endpoints of PD for the server.
Expand All @@ -60,6 +67,13 @@ func StatusPort(p int) ServerOption {
}
}

// GCTTL returns a ServerOption that sets the gc ttl.
func GCTTL(t int64) ServerOption {
return func(o *options) {
o.gcTTL = t
}
}

// A ServerOption sets options such as the addr of PD.
type ServerOption func(*options)

Expand All @@ -80,7 +94,8 @@ func NewServer(opt ...ServerOption) (*Server, error) {
log.Info("creating CDC server",
zap.String("pd-addr", opts.pdEndpoints),
zap.String("status-host", opts.statusHost),
zap.Int("status-port", opts.statusPort))
zap.Int("status-port", opts.statusPort),
zap.Int64("gc-ttl", opts.gcTTL))

s := &Server{
opts: opts,
Expand Down Expand Up @@ -134,7 +149,7 @@ func (s *Server) run(ctx context.Context) (err error) {
log.Error("campaign failed", zap.Error(err))
return
}
owner, err := NewOwner(s.capture.session)
owner, err := NewOwner(s.capture.session, s.opts.gcTTL)
if err != nil {
log.Error("new owner failed", zap.Error(err))
return
Expand Down
4 changes: 3 additions & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
var (
serverPdAddr string
statusAddr string
gcTTL int64

serverCmd = &cobra.Command{
Use: "server",
Expand All @@ -33,6 +34,7 @@ func init() {

serverCmd.Flags().StringVar(&serverPdAddr, "pd", "http://127.0.0.1:2379", "PD address, separated by comma")
serverCmd.Flags().StringVar(&statusAddr, "status-addr", "127.0.0.1:8300", "Bind address for http status server")
serverCmd.Flags().Int64Var(&gcTTL, "gc-ttl", cdc.DefaultCDCGCSafePointTTL, "CDC GC safepoint TTL duration, specified in seconds")
}

func preRunLogInfo(cmd *cobra.Command, args []string) {
Expand All @@ -50,7 +52,7 @@ func runEServer(cmd *cobra.Command, args []string) error {
}

var opts []cdc.ServerOption
opts = append(opts, cdc.PDEndpoints(serverPdAddr), cdc.StatusHost(addrs[0]), cdc.StatusPort(int(statusPort)))
opts = append(opts, cdc.PDEndpoints(serverPdAddr), cdc.StatusHost(addrs[0]), cdc.StatusPort(int(statusPort)), cdc.GCTTL(gcTTL))

server, err := cdc.NewServer(opts...)
if err != nil {
Expand Down

0 comments on commit 8b17582

Please sign in to comment.