Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838) #3865

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cdc/capture/http_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
return nil, err
}
// set sortEngine and EnableOldValue
cdcClusterVer, err := version.GetTiCDCClusterVersion(captureInfos)
cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos))
if err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions cdc/model/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,13 @@ func (c *CaptureInfo) Unmarshal(data []byte) error {
return errors.Annotatef(cerror.WrapError(cerror.ErrUnmarshalFailed, err),
"unmarshal data: %v", data)
}

// ListVersionsFromCaptureInfos returns the version list of the CaptureInfo list.
func ListVersionsFromCaptureInfos(captureInfos []*CaptureInfo) []string {
var captureVersions []string
for _, ci := range captureInfos {
captureVersions = append(captureVersions, ci.Version)
}

return captureVersions
}
17 changes: 17 additions & 0 deletions cdc/model/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,20 @@ func TestMarshalUnmarshal(t *testing.T) {
require.Nil(t, err)
require.Equal(t, info, decodedInfo)
}

func TestListVersionsFromCaptureInfos(t *testing.T) {
infos := []*CaptureInfo{
{
ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891",
AdvertiseAddr: "127.0.0.1:8300",
Version: "dev",
},
{
ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891",
AdvertiseAddr: "127.0.0.1:8300",
Version: "",
},
}

require.ElementsMatch(t, []string{"dev", ""}, ListVersionsFromCaptureInfos(infos))
}
59 changes: 54 additions & 5 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/cyclic/mark"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)
Expand All @@ -48,7 +49,7 @@ const (
StateError FeedState = "error"
StateFailed FeedState = "failed"
StateStopped FeedState = "stopped"
StateRemoved FeedState = "removed" // deprecated, will be removed in the next version
StateRemoved FeedState = "removed"
StateFinished FeedState = "finished"
)

Expand Down Expand Up @@ -226,10 +227,10 @@ func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error) {
return cloned, err
}

// VerifyAndFix verifies changefeed info and may fillin some fields.
// If a must field is not provided, return an error.
// If some necessary filed is missing but can use a default value, fillin it.
func (info *ChangeFeedInfo) VerifyAndFix() error {
// VerifyAndComplete verifies changefeed info and may fill in some fields.
// If a required field is not provided, return an error.
// If some necessary filed is missing but can use a default value, fill in it.
func (info *ChangeFeedInfo) VerifyAndComplete() error {
defaultConfig := config.GetDefaultReplicaConfig()
if info.Engine == "" {
info.Engine = SortUnified
Expand All @@ -255,6 +256,54 @@ func (info *ChangeFeedInfo) VerifyAndFix() error {
return nil
}

// FixIncompatible fixes incompatible changefeed meta info.
func (info *ChangeFeedInfo) FixIncompatible() {
creatorVersionGate := version.NewCreatorVersionGate(info.CreatorVersion)
if creatorVersionGate.ChangefeedStateFromAdminJob() {
log.Info("Start fixing incompatible changefeed state", zap.Any("changefeed", info))
info.fixState()
log.Info("Fix incompatibility changefeed state completed", zap.Any("changefeed", info))
}
}

// fixState attempts to fix state loss from upgrading the old owner to the new owner.
func (info *ChangeFeedInfo) fixState() {
// Notice: In the old owner we used AdminJobType field to determine if the task was paused or not,
// we need to handle this field in the new owner.
// Otherwise, we will see that the old version of the task is paused and then upgraded,
// and the task is automatically resumed after the upgrade.
state := info.State
// Upgrading from an old owner, we need to deal with cases where the state is normal,
// but actually contains errors and does not match the admin job type.
if state == StateNormal {
switch info.AdminJobType {
// This corresponds to the case of failure or error.
case AdminNone, AdminResume:
if info.Error != nil {
if cerror.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) {
state = StateFailed
} else {
state = StateError
}
}
case AdminStop:
state = StateStopped
case AdminFinish:
state = StateFinished
case AdminRemove:
state = StateRemoved
}
}

if state != info.State {
log.Info("handle old owner inconsistent state",
zap.String("old state", string(info.State)),
zap.String("admin job type", info.AdminJobType.String()),
zap.String("new state", string(state)))
info.State = state
}
}

// CheckErrorHistory checks error history of a changefeed
// if having error record older than GC interval, set needSave to true.
// if error counts reach threshold, set canInit to false.
Expand Down
148 changes: 146 additions & 2 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestFillV1(t *testing.T) {
}, cfg)
}

func TestVerifyAndFix(t *testing.T) {
func TestVerifyAndComplete(t *testing.T) {
t.Parallel()

info := &ChangeFeedInfo{
Expand All @@ -177,7 +177,7 @@ func TestVerifyAndFix(t *testing.T) {
},
}

err := info.VerifyAndFix()
err := info.VerifyAndComplete()
require.Nil(t, err)
require.Equal(t, SortUnified, info.Engine)

Expand All @@ -189,6 +189,150 @@ func TestVerifyAndFix(t *testing.T) {
require.Equal(t, marshalConfig2, marshalConfig1)
}

func TestFixIncompatible(t *testing.T) {
// Test to fix incompatible states.
testCases := []struct {
info *ChangeFeedInfo
expectedState FeedState
}{
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "",
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "4.0.14",
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "5.0.5",
},
expectedState: StateStopped,
},
}

for _, tc := range testCases {
tc.info.FixIncompatible()
require.Equal(t, tc.expectedState, tc.info.State)
}
}

func TestFixState(t *testing.T) {
t.Parallel()

testCases := []struct {
info *ChangeFeedInfo
expectedState FeedState
}{
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: nil,
},
expectedState: StateNormal,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: nil,
},
expectedState: StateNormal,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrGCTTLExceeded.RFCCode()),
},
},
expectedState: StateFailed,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrGCTTLExceeded.RFCCode()),
},
},
expectedState: StateFailed,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrClusterIDMismatch.RFCCode()),
},
},
expectedState: StateError,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrClusterIDMismatch.RFCCode()),
},
},
expectedState: StateError,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminFinish,
State: StateNormal,
Error: nil,
},
expectedState: StateFinished,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminRemove,
State: StateNormal,
Error: nil,
},
expectedState: StateRemoved,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminRemove,
State: StateNormal,
Error: nil,
},
expectedState: StateRemoved,
},
}

for _, tc := range testCases {
tc.info.fixState()
require.Equal(t, tc.expectedState, tc.info.State)
}
}

func TestChangeFeedInfoClone(t *testing.T) {
t.Parallel()

Expand Down
34 changes: 33 additions & 1 deletion cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ type Owner struct {
logLimiter *rate.Limiter
lastTickTime time.Time
closed int32
// bootstrapped specifies whether the owner has been initialized.
// This will only be done when the owner starts the first Tick.
// NOTICE: Do not use it in a method other than tick unexpectedly, as it is not a thread-safe value.
bootstrapped bool

newChangefeed func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed
}
Expand All @@ -108,6 +112,8 @@ func NewOwner4Test(
pdClient pd.Client,
) *Owner {
o := NewOwner(pdClient)
// Most tests do not need to test bootstrap.
o.bootstrapped = true
o.newChangefeed = func(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
return newChangefeed4Test(id, gcManager, newDDLPuller, newSink)
}
Expand All @@ -120,8 +126,15 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState)
failpoint.Return(nil, errors.New("owner run with injected error"))
})
failpoint.Inject("sleep-in-owner-tick", nil)
ctx := stdCtx.(cdcContext.Context)
state := rawState.(*orchestrator.GlobalReactorState)
// At the first Tick, we need to do a bootstrap operation.
// Fix incompatible or incorrect meta information.
if !o.bootstrapped {
o.Bootstrap(state)
o.bootstrapped = true
return state, nil
}

o.captures = state.Captures
o.updateMetrics(state)

Expand All @@ -143,6 +156,7 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState)
return nil, errors.Trace(err)
}

ctx := stdCtx.(cdcContext.Context)
for changefeedID, changefeedState := range state.Changefeeds {
if changefeedState.Info == nil {
o.cleanUpChangefeed(changefeedState)
Expand Down Expand Up @@ -256,6 +270,24 @@ func (o *Owner) cleanUpChangefeed(state *orchestrator.ChangefeedReactorState) {
}
}

// Bootstrap checks if the state contains incompatible or incorrect information and tries to fix it.
func (o *Owner) Bootstrap(state *orchestrator.GlobalReactorState) {
log.Info("Start bootstrapping", zap.Any("state", state))
fixChangefeedInfos(state)
}

// fixChangefeedInfos attempts to fix incompatible or incorrect meta information in changefeed state.
func fixChangefeedInfos(state *orchestrator.GlobalReactorState) {
for _, changefeedState := range state.Changefeeds {
if changefeedState != nil {
changefeedState.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
info.FixIncompatible()
return info, true, nil
})
}
}
}

func (o *Owner) updateMetrics(state *orchestrator.GlobalReactorState) {
// Keep the value of prometheus expression `rate(counter)` = 1
// Please also change alert rule in ticdc.rules.yml when change the expression value.
Expand Down
Loading