Skip to content

Commit

Permalink
cmd: support pause/resume/remove changefeed (#546)
Browse files Browse the repository at this point in the history
* cdc, cmd: add status address to capture info

Signed-off-by: Neil Shen <overvenus@gmail.com>

* tests: adjust start_cdc_server

Signed-off-by: Neil Shen <overvenus@gmail.com>

* cmd: support pause/resume/remove changefeed

Signed-off-by: Neil Shen <overvenus@gmail.com>

* fix test

Signed-off-by: Neil Shen <overvenus@gmail.com>

* address comments

Signed-off-by: Neil Shen <overvenus@gmail.com>

* cdc, cmd: replace status address with address and advertise address

Signed-off-by: Neil Shen <overvenus@gmail.com>

* correct parameter

Signed-off-by: Neil Shen <overvenus@gmail.com>

* use address in PD

Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus authored May 11, 2020
1 parent 9b6353a commit ce12963
Show file tree
Hide file tree
Showing 33 changed files with 419 additions and 120 deletions.
8 changes: 5 additions & 3 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Capture struct {
}

// NewCapture returns a new Capture instance
func NewCapture(pdEndpoints []string) (c *Capture, err error) {
func NewCapture(pdEndpoints []string, advertiseAddr string) (c *Capture, err error) {
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: pdEndpoints,
DialTimeout: 5 * time.Second,
Expand Down Expand Up @@ -83,9 +83,11 @@ func NewCapture(pdEndpoints []string) (c *Capture, err error) {
cli := kv.NewCDCEtcdClient(etcdCli)
id := uuid.New().String()
info := &model.CaptureInfo{
ID: id,
ID: id,
AdvertiseAddr: advertiseAddr,
}
log.Info("creating capture", zap.String("capture-id", id))
log.Info("creating capture",
zap.String("capture-id", id), zap.String("advertise-addr", advertiseAddr))

c = &Capture{
processors: make(map[string]*processor),
Expand Down
10 changes: 6 additions & 4 deletions cdc/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
)

const (
opVarAdminJob = "admin-job"
opVarChangefeedID = "cf-id"
// APIOpVarAdminJob is the key of admin job in HTTP API
APIOpVarAdminJob = "admin-job"
// APIOpVarChangefeedID is the key of changefeed ID in HTTP API
APIOpVarChangefeedID = "cf-id"
)

type commonResp struct {
Expand Down Expand Up @@ -87,14 +89,14 @@ func (s *Server) handleChangefeedAdmin(w http.ResponseWriter, req *http.Request)
writeInternalServerError(w, err)
return
}
typeStr := req.Form.Get(opVarAdminJob)
typeStr := req.Form.Get(APIOpVarAdminJob)
typ, err := strconv.ParseInt(typeStr, 10, 64)
if err != nil {
writeError(w, http.StatusBadRequest, errors.Errorf("invalid admin job type: %s", typeStr))
return
}
job := model.AdminJob{
CfID: req.Form.Get(opVarChangefeedID),
CfID: req.Form.Get(APIOpVarChangefeedID),
Type: model.AdminJobType(typ),
}
err = s.owner.EnqueueJob(job)
Expand Down
17 changes: 11 additions & 6 deletions cdc/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/http/pprof"
"os"
Expand All @@ -30,9 +31,7 @@ import (
"go.uber.org/zap"
)

const defaultStatusPort = 8300

func (s *Server) startStatusHTTP() {
func (s *Server) startStatusHTTP() error {
serverMux := http.NewServeMux()

serverMux.HandleFunc("/debug/pprof/", pprof.Index)
Expand All @@ -49,15 +48,21 @@ func (s *Server) startStatusHTTP() {
prometheus.DefaultGatherer = registry
serverMux.Handle("/metrics", promhttp.Handler())

addr := fmt.Sprintf("%s:%d", s.opts.statusHost, s.opts.statusPort)
addr := s.opts.addr
s.statusServer = &http.Server{Addr: addr, Handler: serverMux}
log.Info("status http server is running", zap.String("addr", addr))

ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
go func() {
err := s.statusServer.ListenAndServe()
log.Info("status http server is running", zap.String("addr", addr))
err = s.statusServer.Serve(ln)
if err != nil && err != http.ErrServerClosed {
log.Error("status server error", zap.Error(err))
}
}()
return nil
}

// status of cdc server
Expand Down
19 changes: 14 additions & 5 deletions cdc/http_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,16 @@ var _ = check.Suite(&httpStatusSuite{})

const retryTime = 20

var testingServerOptions = options{
pdEndpoints: "http://127.0.0.1:2379",
addr: "127.0.0.1:8300",
advertiseAddr: "127.0.0.1:8300",
timezone: nil,
gcTTL: DefaultCDCGCSafePointTTL,
}

func (s *httpStatusSuite) waitUntilServerOnline(c *check.C) {
statusURL := fmt.Sprintf("http://%s:%d/status", defaultServerOptions.statusHost, defaultServerOptions.statusPort)
statusURL := fmt.Sprintf("http://%s/status", testingServerOptions.advertiseAddr)
for i := 0; i < retryTime; i++ {
resp, err := http.Get(statusURL)
if err == nil {
Expand All @@ -44,8 +52,9 @@ func (s *httpStatusSuite) waitUntilServerOnline(c *check.C) {
}

func (s *httpStatusSuite) TestHTTPStatus(c *check.C) {
server := &Server{opts: defaultServerOptions}
server.startStatusHTTP()
server := &Server{opts: testingServerOptions}
err := server.startStatusHTTP()
c.Assert(err, check.IsNil)
defer func() {
c.Assert(server.statusServer.Close(), check.IsNil)
}()
Expand All @@ -57,7 +66,7 @@ func (s *httpStatusSuite) TestHTTPStatus(c *check.C) {
}

func testPprof(c *check.C) {
resp, err := http.Get(fmt.Sprintf("http://%s:%d/debug/pprof/cmdline", defaultServerOptions.statusHost, defaultServerOptions.statusPort))
resp, err := http.Get(fmt.Sprintf("http://%s/debug/pprof/cmdline", testingServerOptions.advertiseAddr))
c.Assert(err, check.IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, check.Equals, 200)
Expand All @@ -66,7 +75,7 @@ func testPprof(c *check.C) {
}

func testReisgnOwner(c *check.C) {
uri := fmt.Sprintf("http://%s:%d/capture/owner/resign", defaultServerOptions.statusHost, defaultServerOptions.statusPort)
uri := fmt.Sprintf("http://%s/capture/owner/resign", testingServerOptions.advertiseAddr)
resp, err := http.Get(uri)
c.Assert(err, check.IsNil)
defer resp.Body.Close()
Expand Down
3 changes: 2 additions & 1 deletion cdc/model/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (

// CaptureInfo store in etcd.
type CaptureInfo struct {
ID string `json:"id"`
ID string `json:"id"`
AdvertiseAddr string `json:"address"`
}

// Marshal using json.Marshal.
Expand Down
6 changes: 4 additions & 2 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,8 @@ func (o *Owner) watchCapture(ctx context.Context) error {
return errors.Trace(err)
}
log.Debug("capture deleted",
zap.String("captureid", c.ID))
zap.String("capture-id", c.ID),
zap.String("advertise-aadr", c.AdvertiseAddr))
o.removeCapture(c)
case clientv3.EventTypePut:
if !ev.IsCreate() {
Expand All @@ -771,7 +772,8 @@ func (o *Owner) watchCapture(ctx context.Context) error {
return errors.Trace(err)
}
log.Debug("capture added",
zap.String("captureid", c.ID))
zap.String("capture-id", c.ID),
zap.String("advertise-aadr", c.AdvertiseAddr))
o.addCapture(c)
}
}
Expand Down
60 changes: 37 additions & 23 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,27 @@ const (
)

type options struct {
pdEndpoints string
statusHost string
statusPort int
gcTTL int64
timezone *time.Location
pdEndpoints string
addr string
advertiseAddr string
gcTTL int64
timezone *time.Location
}

var defaultServerOptions = options{
pdEndpoints: "http://127.0.0.1:2379",
statusHost: "127.0.0.1",
statusPort: defaultStatusPort,
timezone: nil,
gcTTL: DefaultCDCGCSafePointTTL,
func (o *options) validateAndAdjust() error {
if o.pdEndpoints == "" {
return errors.New("empty PD address")
}
if o.addr == "" {
return errors.New("empty address")
}
if o.advertiseAddr == "" {
o.advertiseAddr = o.addr
}
if o.gcTTL == 0 {
return errors.New("empty GC TTL is not allowed")
}
return nil
}

// PDEndpoints returns a ServerOption that sets the endpoints of PD for the server.
Expand All @@ -57,17 +65,17 @@ func PDEndpoints(s string) ServerOption {
}
}

// StatusHost returns a ServerOption that sets the status server host
func StatusHost(s string) ServerOption {
// Address returns a ServerOption that sets the server listen address
func Address(s string) ServerOption {
return func(o *options) {
o.statusHost = s
o.addr = s
}
}

// StatusPort returns a ServerOption that sets the status server port
func StatusPort(p int) ServerOption {
// AdvertiseAddress returns a ServerOption that sets the server advertise address
func AdvertiseAddress(s string) ServerOption {
return func(o *options) {
o.statusPort = p
o.advertiseAddr = s
}
}

Expand Down Expand Up @@ -98,14 +106,17 @@ type Server struct {

// NewServer creates a Server instance.
func NewServer(opt ...ServerOption) (*Server, error) {
opts := defaultServerOptions
opts := options{}
for _, o := range opt {
o(&opts)
}
if err := opts.validateAndAdjust(); err != nil {
return nil, err
}
log.Info("creating CDC server",
zap.String("pd-addr", opts.pdEndpoints),
zap.String("status-host", opts.statusHost),
zap.Int("status-port", opts.statusPort),
zap.String("address", opts.addr),
zap.String("advertise-address", opts.advertiseAddr),
zap.Int64("gc-ttl", opts.gcTTL),
zap.Any("timezone", opts.timezone))

Expand All @@ -117,8 +128,10 @@ func NewServer(opt ...ServerOption) (*Server, error) {

// Run runs the server.
func (s *Server) Run(ctx context.Context) error {
s.startStatusHTTP()

err := s.startStatusHTTP()
if err != nil {
return err
}
// When a capture suicided, restart it
for {
if err := s.run(ctx); errors.Cause(err) != ErrSuicide {
Expand Down Expand Up @@ -169,7 +182,8 @@ func (s *Server) campaignOwnerLoop(ctx context.Context) error {
}

func (s *Server) run(ctx context.Context) (err error) {
capture, err := NewCapture(strings.Split(s.opts.pdEndpoints, ","))
capture, err := NewCapture(
strings.Split(s.opts.pdEndpoints, ","), s.opts.advertiseAddr)
if err != nil {
return err
}
Expand Down
48 changes: 48 additions & 0 deletions cdc/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package cdc

import (
"github.com/pingcap/check"
)

type serverOptionSuite struct{}

var _ = check.Suite(&serverOptionSuite{})

func (s *serverOptionSuite) TestNewServer(c *check.C) {
svr, err := NewServer()
c.Assert(svr, check.IsNil)
c.Assert(err, check.ErrorMatches, "empty PD address")

svr, err = NewServer(PDEndpoints("pd"))
c.Assert(svr, check.IsNil)
c.Assert(err, check.ErrorMatches, "empty address")

svr, err = NewServer(PDEndpoints("pd"), Address("cdc"))
c.Assert(svr, check.IsNil)
c.Assert(err, check.ErrorMatches, "empty GC TTL is not allowed")

svr, err = NewServer(PDEndpoints("pd"), Address("cdc"), GCTTL(DefaultCDCGCSafePointTTL))
c.Assert(svr, check.NotNil)
c.Assert(err, check.IsNil)
c.Assert(svr.opts.advertiseAddr, check.Equals, "cdc")

svr, err = NewServer(PDEndpoints("pd"), Address("cdc"), GCTTL(DefaultCDCGCSafePointTTL),
AdvertiseAddress("advertise"))
c.Assert(svr, check.NotNil)
c.Assert(err, check.IsNil)
c.Assert(svr.opts.addr, check.Equals, "cdc")
c.Assert(svr.opts.advertiseAddr, check.Equals, "advertise")
}
4 changes: 2 additions & 2 deletions cdc/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *taskSuite) SetUpTest(c *check.C) {
pdEndpoints: s.endpoints,
etcdClient: kv.NewCDCEtcdClient(client),
processors: make(map[string]*processor),
info: &model.CaptureInfo{ID: "task-suite-capture"},
info: &model.CaptureInfo{ID: "task-suite-capture", AdvertiseAddr: "task-suite-addr"},
}
c.Assert(capture, check.NotNil)
watcher := NewTaskWatcher(capture, &TaskWatcherConfig{
Expand All @@ -79,7 +79,7 @@ func (s *taskSuite) TestNewTaskWatcher(c *check.C) {
pdEndpoints: s.endpoints,
etcdClient: kv.NewCDCEtcdClient(s.c),
processors: make(map[string]*processor),
info: &model.CaptureInfo{ID: "task-suite-capture"},
info: &model.CaptureInfo{ID: "task-suite-capture", AdvertiseAddr: "task-suite-addr"},
}
c.Assert(capture, check.NotNil)
c.Assert(NewTaskWatcher(capture, &TaskWatcherConfig{
Expand Down
Loading

0 comments on commit ce12963

Please sign in to comment.