Skip to content

Commit

Permalink
Merge branch 'release-5.2' into cherry-pick-4198-to-release-5.2
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus authored Jan 18, 2022
2 parents cf0d005 + 87115bc commit 54eb336
Show file tree
Hide file tree
Showing 36 changed files with 847 additions and 245 deletions.
3 changes: 2 additions & 1 deletion cdc/capture/http_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ var httpBadRequestError = []*errors.Error{
cerror.ErrAPIInvalidParam, cerror.ErrSinkURIInvalid, cerror.ErrStartTsBeforeGC,
cerror.ErrChangeFeedNotExists, cerror.ErrTargetTsBeforeStartTs, cerror.ErrTableIneligible,
cerror.ErrFilterRuleInvalid, cerror.ErrChangefeedUpdateRefused, cerror.ErrMySQLConnectionError,
cerror.ErrMySQLInvalidConfig,
cerror.ErrMySQLInvalidConfig, cerror.ErrCaptureNotExist, cerror.ErrTaskStatusNotExists,
cerror.ErrTaskPositionNotExists,
}

// IsHTTPBadRequestError check if a error is a http bad request error
Expand Down
2 changes: 1 addition & 1 deletion cdc/capture/http_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
return nil, err
}
// set sortEngine and EnableOldValue
cdcClusterVer, err := version.GetTiCDCClusterVersion(captureInfos)
cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos))
if err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions cdc/model/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,13 @@ func (c *CaptureInfo) Unmarshal(data []byte) error {
return errors.Annotatef(cerror.WrapError(cerror.ErrUnmarshalFailed, err),
"unmarshal data: %v", data)
}

// ListVersionsFromCaptureInfos returns the version list of the CaptureInfo list.
func ListVersionsFromCaptureInfos(captureInfos []*CaptureInfo) []string {
var captureVersions []string
for _, ci := range captureInfos {
captureVersions = append(captureVersions, ci.Version)
}

return captureVersions
}
17 changes: 17 additions & 0 deletions cdc/model/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,20 @@ func (s *captureSuite) TestMarshalUnmarshal(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(decodedInfo, check.DeepEquals, info)
}

func (s *captureSuite) TestListVersionsFromCaptureInfos(c *check.C) {
defer testleak.AfterTest(c)()
infos := []*CaptureInfo{
{
ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891",
AdvertiseAddr: "127.0.0.1:8300",
Version: "dev",
},
{
ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891",
AdvertiseAddr: "127.0.0.1:8300",
Version: "",
},
}
c.Assert(ListVersionsFromCaptureInfos(infos), check.DeepEquals, []string{"dev", ""})
}
59 changes: 54 additions & 5 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/cyclic/mark"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)
Expand All @@ -48,7 +49,7 @@ const (
StateError FeedState = "error"
StateFailed FeedState = "failed"
StateStopped FeedState = "stopped"
StateRemoved FeedState = "removed" // deprecated, will be removed in the next version
StateRemoved FeedState = "removed"
StateFinished FeedState = "finished"
)

Expand Down Expand Up @@ -226,10 +227,10 @@ func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error) {
return cloned, err
}

// VerifyAndFix verifies changefeed info and may fillin some fields.
// If a must field is not provided, return an error.
// If some necessary filed is missing but can use a default value, fillin it.
func (info *ChangeFeedInfo) VerifyAndFix() error {
// VerifyAndComplete verifies changefeed info and may fill in some fields.
// If a required field is not provided, return an error.
// If some necessary filed is missing but can use a default value, fill in it.
func (info *ChangeFeedInfo) VerifyAndComplete() error {
defaultConfig := config.GetDefaultReplicaConfig()
if info.Engine == "" {
info.Engine = SortUnified
Expand All @@ -252,6 +253,54 @@ func (info *ChangeFeedInfo) VerifyAndFix() error {
return nil
}

// FixIncompatible fixes incompatible changefeed meta info.
func (info *ChangeFeedInfo) FixIncompatible() {
creatorVersionGate := version.NewCreatorVersionGate(info.CreatorVersion)
if creatorVersionGate.ChangefeedStateFromAdminJob() {
log.Info("Start fixing incompatible changefeed state", zap.Any("changefeed", info))
info.fixState()
log.Info("Fix incompatibility changefeed state completed", zap.Any("changefeed", info))
}
}

// fixState attempts to fix state loss from upgrading the old owner to the new owner.
func (info *ChangeFeedInfo) fixState() {
// Notice: In the old owner we used AdminJobType field to determine if the task was paused or not,
// we need to handle this field in the new owner.
// Otherwise, we will see that the old version of the task is paused and then upgraded,
// and the task is automatically resumed after the upgrade.
state := info.State
// Upgrading from an old owner, we need to deal with cases where the state is normal,
// but actually contains errors and does not match the admin job type.
if state == StateNormal {
switch info.AdminJobType {
// This corresponds to the case of failure or error.
case AdminNone, AdminResume:
if info.Error != nil {
if cerror.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) {
state = StateFailed
} else {
state = StateError
}
}
case AdminStop:
state = StateStopped
case AdminFinish:
state = StateFinished
case AdminRemove:
state = StateRemoved
}
}

if state != info.State {
log.Info("handle old owner inconsistent state",
zap.String("old state", string(info.State)),
zap.String("admin job type", info.AdminJobType.String()),
zap.String("new state", string(state)))
info.State = state
}
}

// CheckErrorHistory checks error history of a changefeed
// if having error record older than GC interval, set needSave to true.
// if error counts reach threshold, set canInit to false.
Expand Down
150 changes: 148 additions & 2 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *configSuite) TestFillV1(c *check.C) {
})
}

func (s *configSuite) TestVerifyAndFix(c *check.C) {
func (s *configSuite) TestVerifyAndComplete(c *check.C) {
defer testleak.AfterTest(c)()
info := &ChangeFeedInfo{
SinkURI: "blackhole://",
Expand All @@ -178,7 +178,7 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) {
},
}

err := info.VerifyAndFix()
err := info.VerifyAndComplete()
c.Assert(err, check.IsNil)
c.Assert(info.Engine, check.Equals, SortUnified)

Expand All @@ -190,6 +190,152 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) {
c.Assert(marshalConfig1, check.Equals, marshalConfig2)
}

func (s *configSuite) TestFixIncompatible(c *check.C) {
defer testleak.AfterTest(c)()

// Test to fix incompatible states.
testCases := []struct {
info *ChangeFeedInfo
expectedState FeedState
}{
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "",
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "4.0.14",
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "5.0.5",
},
expectedState: StateStopped,
},
}

for _, tc := range testCases {
tc.info.FixIncompatible()
c.Assert(tc.info.State, check.Equals, tc.expectedState)
}
}

func (s *configSuite) TestFixState(c *check.C) {
defer testleak.AfterTest(c)()

testCases := []struct {
info *ChangeFeedInfo
expectedState FeedState
}{
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: nil,
},
expectedState: StateNormal,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: nil,
},
expectedState: StateNormal,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrGCTTLExceeded.RFCCode()),
},
},
expectedState: StateFailed,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrGCTTLExceeded.RFCCode()),
},
},
expectedState: StateFailed,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrKafkaNewSaramaProducer.RFCCode()),
},
},
expectedState: StateError,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrKafkaNewSaramaProducer.RFCCode()),
},
},
expectedState: StateError,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminFinish,
State: StateNormal,
Error: nil,
},
expectedState: StateFinished,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminRemove,
State: StateNormal,
Error: nil,
},
expectedState: StateRemoved,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminRemove,
State: StateNormal,
Error: nil,
},
expectedState: StateRemoved,
},
}

for _, tc := range testCases {
tc.info.fixState()
c.Assert(tc.info.State, check.Equals, tc.expectedState)
}
}

func (s *configSuite) TestChangeFeedInfoClone(c *check.C) {
defer testleak.AfterTest(c)()
info := &ChangeFeedInfo{
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/reactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) er
return errors.Trace(err)
}
if key.Tp == etcd.CDCKeyTypeChangefeedInfo {
if err := s.Info.VerifyAndFix(); err != nil {
if err := s.Info.VerifyAndComplete(); err != nil {
return errors.Trace(err)
}
}
Expand Down
5 changes: 0 additions & 5 deletions cdc/owner/async_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ const (
// The EmitCheckpointTs and EmitDDLEvent is asynchronous function for now
// Other functions are still synchronization
type AsyncSink interface {
Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error
// EmitCheckpointTs emits the checkpoint Ts to downstream data source
// this function will return after recording the checkpointTs specified in memory immediately
// and the recorded checkpointTs will be sent and updated to downstream data source every second
Expand Down Expand Up @@ -101,10 +100,6 @@ func newAsyncSink(ctx cdcContext.Context) (AsyncSink, error) {
return asyncSink, nil
}

func (s *asyncSinkImpl) Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error {
return s.sink.Initialize(ctx, tableInfo)
}

func (s *asyncSinkImpl) run(ctx cdcContext.Context) {
defer s.wg.Done()
// TODO make the tick duration configurable
Expand Down
25 changes: 4 additions & 21 deletions cdc/owner/async_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,10 @@ type asyncSinkSuite struct {

type mockSink struct {
sink.Sink
initTableInfo []*model.SimpleTableInfo
checkpointTs model.Ts
ddl *model.DDLEvent
ddlMu sync.Mutex
ddlError error
}

func (m *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
m.initTableInfo = tableInfo
return nil
checkpointTs model.Ts
ddl *model.DDLEvent
ddlMu sync.Mutex
ddlError error
}

func (m *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
Expand Down Expand Up @@ -88,17 +82,6 @@ func newAsyncSink4Test(ctx cdcContext.Context, c *check.C) (cdcContext.Context,
return ctx, sink, mockSink
}

func (s *asyncSinkSuite) TestInitialize(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
ctx, sink, mockSink := newAsyncSink4Test(ctx, c)
defer sink.Close(ctx)
tableInfos := []*model.SimpleTableInfo{{Schema: "test"}}
err := sink.Initialize(ctx, tableInfos)
c.Assert(err, check.IsNil)
c.Assert(tableInfos, check.DeepEquals, mockSink.initTableInfo)
}

func (s *asyncSinkSuite) TestCheckpoint(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
Expand Down
5 changes: 1 addition & 4 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,7 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
err = c.sink.Initialize(cancelCtx, c.schema.SinkTableInfos())
if err != nil {
return errors.Trace(err)
}

// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
if err != nil {
Expand Down
Loading

0 comments on commit 54eb336

Please sign in to comment.