Skip to content

Commit

Permalink
Merge branch 'release-5.3' into cherry-pick-3277-to-release-5.3
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 20, 2021
2 parents 36dc613 + a2c1989 commit feeba8f
Show file tree
Hide file tree
Showing 361 changed files with 1,797 additions and 787 deletions.
12 changes: 12 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ linters:
- unconvert
- unparam
- revive
- depguard

linters-settings:
revive:
Expand All @@ -28,3 +29,14 @@ linters-settings:
- name: superfluous-else
- name: modifies-parameter
- name: unreachable-code

depguard:
list-type: blacklist
include-go-root: false
packages:
- log
- github.com/juju/errors
packages-with-error-message:
# specify an error message to output when a blacklisted package is used
- log: "logging is allowed only by pingcap/log"
- github.com/juju/errors: "error handling is allowed only by pingcap/errors"
15 changes: 10 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,10 @@ integration_test_build: check_failpoint_ctl
integration_test: integration_test_mysql

integration_test_mysql:
tests/run.sh mysql "$(CASE)"
tests/integration_tests/run.sh mysql "$(CASE)"

integration_test_kafka: check_third_party_binary
tests/run.sh kafka "$(CASE)"
tests/integration_tests/run.sh kafka "$(CASE)"

fmt: tools/bin/gofumports tools/bin/shfmt
@echo "gofmt (simplify)"
Expand Down Expand Up @@ -201,15 +201,20 @@ check-static: tools/bin/golangci-lint

check: check-copyright fmt check-static tidy terror_check errdoc check-leaktest-added check-merge-conflicts

coverage: tools/bin/gocovmerge tools/bin/goveralls
integration_test_coverage: tools/bin/gocovmerge tools/bin/goveralls
tools/bin/gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/entry/schema_test_helper.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out"
grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" "$(TEST_DIR)/cov.unit.out" > "$(TEST_DIR)/unit_cov.out"
ifeq ("$(JenkinsCI)", "1")
GO111MODULE=off go get github.com/mattn/goveralls
tools/bin/goveralls -coverprofile=$(TEST_DIR)/all_cov.out -service=jenkins-ci -repotoken $(COVERALLS_TOKEN)
@bash <(curl -s https://codecov.io/bash) -f $(TEST_DIR)/unit_cov.out -t $(CODECOV_TOKEN)
else
go tool cover -html "$(TEST_DIR)/all_cov.out" -o "$(TEST_DIR)/all_cov.html"
endif

unit_test_coverage:
grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/kv/testing.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" "$(TEST_DIR)/cov.unit.out" > "$(TEST_DIR)/unit_cov.out"
ifeq ("$(JenkinsCI)", "1")
@bash <(curl -s https://codecov.io/bash) -f $(TEST_DIR)/unit_cov.out -t $(CODECOV_TOKEN)
else
go tool cover -html "$(TEST_DIR)/unit_cov.out" -o "$(TEST_DIR)/unit_cov.html"
go tool cover -func="$(TEST_DIR)/unit_cov.out"
endif
Expand Down
33 changes: 23 additions & 10 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
pd "github.com/tikv/pd/client"
Expand All @@ -55,10 +56,11 @@ type Capture struct {
session *concurrency.Session
election *concurrency.Election

pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool
pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool
TimeAcquirer pdtime.TimeAcquirer

cancel context.CancelFunc

Expand Down Expand Up @@ -100,6 +102,12 @@ func (c *Capture) reset(ctx context.Context) error {
}
c.session = sess
c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey)

if c.TimeAcquirer != nil {
c.TimeAcquirer.Stop()
}
c.TimeAcquirer = pdtime.NewTimeAcquirer(c.pdClient)

if c.grpcPool != nil {
c.grpcPool.Close()
}
Expand Down Expand Up @@ -148,11 +156,12 @@ func (c *Capture) Run(ctx context.Context) error {

func (c *Capture) run(stdCtx context.Context) error {
ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
TimeAcquirer: c.TimeAcquirer,
})
err := c.register(ctx)
if err != nil {
Expand All @@ -166,7 +175,7 @@ func (c *Capture) run(stdCtx context.Context) error {
cancel()
}()
wg := new(sync.WaitGroup)
wg.Add(3)
wg.Add(4)
var ownerErr, processorErr error
go func() {
defer wg.Done()
Expand All @@ -188,6 +197,10 @@ func (c *Capture) run(stdCtx context.Context) error {
processorErr = c.runEtcdWorker(ctx, c.processorManager, orchestrator.NewGlobalState(), processorFlushInterval)
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
go func() {
defer wg.Done()
c.TimeAcquirer.Run(ctx)
}()
go func() {
defer wg.Done()
c.grpcPool.RecycleConn(ctx)
Expand Down
34 changes: 21 additions & 13 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package owner
import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -49,6 +48,8 @@ type changefeed struct {
sink AsyncSink
ddlPuller DDLPuller
initialized bool
// isRemoved is true if the changefeed is removed
isRemoved bool

// only used for asyncExecDDL function
// ddlEventCache is not nil when the changefeed is executing a DDL event asynchronously
Expand Down Expand Up @@ -119,7 +120,7 @@ func (c *changefeed) Tick(ctx cdcContext.Context, state *orchestrator.Changefeed
Code: code,
Message: err.Error(),
})
c.releaseResources()
c.releaseResources(ctx)
}
}

Expand Down Expand Up @@ -148,7 +149,8 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
}

if !c.feedStateManager.ShouldRunning() {
c.releaseResources()
c.isRemoved = c.feedStateManager.ShouldRemoved()
c.releaseResources(ctx)
return nil
}

Expand Down Expand Up @@ -181,7 +183,9 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
return errors.Trace(err)
}
if shouldUpdateState {
c.updateStatus(barrierTs)
pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached()
currentTs := oracle.GetPhysical(pdTime)
c.updateStatus(currentTs, barrierTs)
}
return nil
}
Expand Down Expand Up @@ -282,16 +286,22 @@ LOOP:
return nil
}

func (c *changefeed) releaseResources() {
func (c *changefeed) releaseResources(ctx context.Context) {
if !c.initialized {
return
}
log.Info("close changefeed", zap.String("changefeed", c.state.ID),
zap.Stringer("info", c.state.Info))
zap.Stringer("info", c.state.Info), zap.Bool("isRemoved", c.isRemoved))
c.cancel()
c.cancel = func() {}
c.ddlPuller.Close()
c.schema = nil
if c.isRemoved && c.redoManager.Enabled() {
err := c.redoManager.Cleanup(ctx)
if err != nil {
log.Error("cleanup redo logs failed", zap.String("changefeed", c.id), zap.Error(err))
}
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
// We don't need to wait sink Close, pass a canceled context is ok
Expand Down Expand Up @@ -455,7 +465,7 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
return done, nil
}

func (c *changefeed) updateStatus(barrierTs model.Ts) {
func (c *changefeed) updateStatus(currentTs int64, barrierTs model.Ts) {
resolvedTs := barrierTs
for _, position := range c.state.TaskPositions {
if resolvedTs > position.ResolvedTs {
Expand Down Expand Up @@ -487,14 +497,12 @@ func (c *changefeed) updateStatus(barrierTs model.Ts) {
}
return status, changed, nil
})

phyTs := oracle.ExtractPhysical(checkpointTs)

c.metricsChangefeedCheckpointTsGauge.Set(float64(phyTs))
// It is more accurate to get tso from PD, but in most cases since we have
// deployed NTP service, a little bias is acceptable here.
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-phyTs) / 1e3)
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyTs) / 1e3)
}

func (c *changefeed) Close() {
c.releaseResources()
func (c *changefeed) Close(ctx context.Context) {
c.releaseResources(ctx)
}
12 changes: 7 additions & 5 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/ticdc/pkg/version"
Expand Down Expand Up @@ -166,7 +167,7 @@ func (s *changefeedSuite) TestInitialize(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(true)
cf, state, captures, tester := createChangefeed4Test(ctx, c)
defer cf.Close()
defer cf.Close(ctx)
// pre check
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
Expand All @@ -181,7 +182,7 @@ func (s *changefeedSuite) TestHandleError(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(true)
cf, state, captures, tester := createChangefeed4Test(ctx, c)
defer cf.Close()
defer cf.Close(ctx)
// pre check
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
Expand Down Expand Up @@ -216,6 +217,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
TimeAcquirer: pdtime.NewTimeAcquirer4Test(),
})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test",
Expand All @@ -226,7 +228,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
})

cf, state, captures, tester := createChangefeed4Test(ctx, c)
defer cf.Close()
defer cf.Close(ctx)
tickThreeTime := func() {
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
Expand Down Expand Up @@ -298,7 +300,7 @@ func (s *changefeedSuite) TestSyncPoint(c *check.C) {
ctx.ChangefeedVars().Info.SyncPointEnabled = true
ctx.ChangefeedVars().Info.SyncPointInterval = 1 * time.Second
cf, state, captures, tester := createChangefeed4Test(ctx, c)
defer cf.Close()
defer cf.Close(ctx)

// pre check
cf.Tick(ctx, state, captures)
Expand Down Expand Up @@ -329,7 +331,7 @@ func (s *changefeedSuite) TestFinished(c *check.C) {
ctx := cdcContext.NewBackendContext4Test(true)
ctx.ChangefeedVars().Info.TargetTs = ctx.ChangefeedVars().Info.StartTs + 1000
cf, state, captures, tester := createChangefeed4Test(ctx, c)
defer cf.Close()
defer cf.Close(ctx)

// pre check
cf.Tick(ctx, state, captures)
Expand Down
16 changes: 14 additions & 2 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
type feedStateManager struct {
state *orchestrator.ChangefeedReactorState
shouldBeRunning bool
// Based on shouldBeRunning = false
// shouldBeRemoved = true means the changefeed is removed
// shouldBeRemoved = false means the changefeed is paused
shouldBeRemoved bool

adminJobQueue []*model.AdminJob
}
Expand All @@ -49,7 +53,11 @@ func (m *feedStateManager) Tick(state *orchestrator.ChangefeedReactorState) {
return
}
switch m.state.Info.State {
case model.StateStopped, model.StateFailed, model.StateRemoved, model.StateFinished:
case model.StateRemoved:
m.shouldBeRunning = false
m.shouldBeRemoved = true
return
case model.StateStopped, model.StateFailed, model.StateFinished:
m.shouldBeRunning = false
return
}
Expand All @@ -61,6 +69,10 @@ func (m *feedStateManager) ShouldRunning() bool {
return m.shouldBeRunning
}

func (m *feedStateManager) ShouldRemoved() bool {
return m.shouldBeRemoved
}

func (m *feedStateManager) MarkFinished() {
if m.state == nil {
// when state is nil, it means that Tick has never been called
Expand Down Expand Up @@ -111,6 +123,7 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
return
}
m.shouldBeRunning = false
m.shouldBeRemoved = true
jobsPending = true

// remove changefeedInfo
Expand All @@ -123,7 +136,6 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
})
checkpointTs := m.state.Info.GetCheckpointTs(m.state.Status)
log.Info("the changefeed is removed", zap.String("changefeed-id", m.state.ID), zap.Uint64("checkpoint-ts", checkpointTs))

case model.AdminResume:
switch m.state.Info.State {
case model.StateFailed, model.StateError, model.StateStopped, model.StateFinished:
Expand Down
6 changes: 6 additions & 0 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (s *feedStateManagerSuite) TestHandleJob(c *check.C) {
manager.Tick(state)
tester.MustApplyPatches()
c.Assert(manager.ShouldRunning(), check.IsFalse)
c.Assert(manager.ShouldRemoved(), check.IsFalse)
c.Assert(state.Info.State, check.Equals, model.StateStopped)
c.Assert(state.Info.AdminJobType, check.Equals, model.AdminStop)
c.Assert(state.Status.AdminJobType, check.Equals, model.AdminStop)
Expand All @@ -83,6 +84,7 @@ func (s *feedStateManagerSuite) TestHandleJob(c *check.C) {
manager.Tick(state)
tester.MustApplyPatches()
c.Assert(manager.ShouldRunning(), check.IsTrue)
c.Assert(manager.ShouldRemoved(), check.IsFalse)
c.Assert(state.Info.State, check.Equals, model.StateNormal)
c.Assert(state.Info.AdminJobType, check.Equals, model.AdminNone)
c.Assert(state.Status.AdminJobType, check.Equals, model.AdminNone)
Expand All @@ -95,6 +97,7 @@ func (s *feedStateManagerSuite) TestHandleJob(c *check.C) {
manager.Tick(state)
tester.MustApplyPatches()
c.Assert(manager.ShouldRunning(), check.IsFalse)
c.Assert(manager.ShouldRemoved(), check.IsTrue)
c.Assert(state.Exist(), check.IsFalse)
}

Expand Down Expand Up @@ -217,6 +220,7 @@ func (s *feedStateManagerSuite) TestHandleError(c *check.C) {
tester.MustApplyPatches()
}
c.Assert(manager.ShouldRunning(), check.IsFalse)
c.Assert(manager.ShouldRemoved(), check.IsFalse)
c.Assert(state.Info.State, check.Equals, model.StateError)
c.Assert(state.Info.AdminJobType, check.Equals, model.AdminStop)
c.Assert(state.Status.AdminJobType, check.Equals, model.AdminStop)
Expand All @@ -234,6 +238,7 @@ func (s *feedStateManagerSuite) TestChangefeedStatusNotExist(c *check.C) {
})
manager.Tick(state)
c.Assert(manager.ShouldRunning(), check.IsFalse)
c.Assert(manager.ShouldRemoved(), check.IsFalse)
tester.MustApplyPatches()

manager.PushAdminJob(&model.AdminJob{
Expand All @@ -243,6 +248,7 @@ func (s *feedStateManagerSuite) TestChangefeedStatusNotExist(c *check.C) {
})
manager.Tick(state)
c.Assert(manager.ShouldRunning(), check.IsFalse)
c.Assert(manager.ShouldRemoved(), check.IsTrue)
tester.MustApplyPatches()
c.Assert(state.Info, check.IsNil)
c.Assert(state.Exist(), check.IsFalse)
Expand Down
Loading

0 comments on commit feeba8f

Please sign in to comment.