Skip to content

Commit

Permalink
owner(ticdc): Add bootstrap and try to fix the meta information in it (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 12, 2022
1 parent 65839c2 commit e572735
Show file tree
Hide file tree
Showing 15 changed files with 488 additions and 61 deletions.
21 changes: 19 additions & 2 deletions cdc/http_api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/httputil"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -99,7 +98,7 @@ func (s *Server) handleChangefeedsList(w http.ResponseWriter, req *http.Request)
writeInternalServerErrorJSON(w, err)
return
}
if !httputil.IsFiltered(state, cfInfo.State) {
if !isFiltered(state, cfInfo.State) {
continue
}
cfStatus, _, err := s.etcdClient.GetChangeFeedStatus(req.Context(), changefeedID)
Expand Down Expand Up @@ -144,3 +143,21 @@ func writeErrorJSON(w http.ResponseWriter, statusCode int, cerr errors.Error) {
log.Error("fail to write data", zap.Error(err))
}
}

// isFiltered return true if the given feedState matches the whiteList.
func isFiltered(whiteList string, feedState model.FeedState) bool {
if whiteList == "all" {
return true
}
if whiteList == "" {
switch feedState {
case model.StateNormal:
return true
case model.StateStopped:
return true
case model.StateFailed:
return true
}
}
return whiteList == string(feedState)
}
10 changes: 10 additions & 0 deletions cdc/model/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,13 @@ func (c *CaptureInfo) Unmarshal(data []byte) error {
return errors.Annotatef(cerror.WrapError(cerror.ErrUnmarshalFailed, err),
"unmarshal data: %v", data)
}

// ListVersionsFromCaptureInfos returns the version list of the CaptureInfo list.
func ListVersionsFromCaptureInfos(captureInfos []*CaptureInfo) []string {
var captureVersions []string
for _, ci := range captureInfos {
captureVersions = append(captureVersions, ci.Version)
}

return captureVersions
}
18 changes: 18 additions & 0 deletions cdc/model/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,21 @@ func (s *captureSuite) TestMarshalUnmarshal(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(decodedInfo, check.DeepEquals, info)
}

func (s *captureSuite) TestListVersionsFromCaptureInfos(c *check.C) {
defer testleak.AfterTest(c)()
infos := []*CaptureInfo{
{
ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891",
AdvertiseAddr: "127.0.0.1:8300",
Version: "dev",
},
{
ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891",
AdvertiseAddr: "127.0.0.1:8300",
Version: "",
},
}

c.Assert(ListVersionsFromCaptureInfos(infos), check.DeepEquals, []string{"dev", ""})
}
59 changes: 54 additions & 5 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/cyclic/mark"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/version"
"go.uber.org/zap"
)

Expand All @@ -48,7 +49,7 @@ const (
StateError FeedState = "error"
StateFailed FeedState = "failed"
StateStopped FeedState = "stopped"
StateRemoved FeedState = "removed" // deprecated, will be removed in the next version
StateRemoved FeedState = "removed"
StateFinished FeedState = "finished"
)

Expand Down Expand Up @@ -208,10 +209,10 @@ func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error) {
return cloned, err
}

// VerifyAndFix verifies changefeed info and may fillin some fields.
// If a must field is not provided, return an error.
// If some necessary filed is missing but can use a default value, fillin it.
func (info *ChangeFeedInfo) VerifyAndFix() error {
// VerifyAndComplete verifies changefeed info and may fill in some fields.
// If a required field is not provided, return an error.
// If some necessary filed is missing but can use a default value, fill in it.
func (info *ChangeFeedInfo) VerifyAndComplete() error {
defaultConfig := config.GetDefaultReplicaConfig()
if info.Engine == "" {
info.Engine = SortUnified
Expand All @@ -234,6 +235,54 @@ func (info *ChangeFeedInfo) VerifyAndFix() error {
return nil
}

// FixIncompatible fixes incompatible changefeed meta info.
func (info *ChangeFeedInfo) FixIncompatible() {
creatorVersionGate := version.NewCreatorVersionGate(info.CreatorVersion)
if creatorVersionGate.ChangefeedStateFromAdminJob() {
log.Info("Start fixing incompatible changefeed state", zap.Any("changefeed", info))
info.fixState()
log.Info("Fix incompatibility changefeed state completed", zap.Any("changefeed", info))
}
}

// fixState attempts to fix state loss from upgrading the old owner to the new owner.
func (info *ChangeFeedInfo) fixState() {
// Notice: In the old owner we used AdminJobType field to determine if the task was paused or not,
// we need to handle this field in the new owner.
// Otherwise, we will see that the old version of the task is paused and then upgraded,
// and the task is automatically resumed after the upgrade.
state := info.State
// Upgrading from an old owner, we need to deal with cases where the state is normal,
// but actually contains errors and does not match the admin job type.
if state == StateNormal {
switch info.AdminJobType {
// This corresponds to the case of failure or error.
case AdminNone, AdminResume:
if info.Error != nil {
if cerror.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) {
state = StateFailed
} else {
state = StateError
}
}
case AdminStop:
state = StateStopped
case AdminFinish:
state = StateFinished
case AdminRemove:
state = StateRemoved
}
}

if state != info.State {
log.Info("handle old owner inconsistent state",
zap.String("old state", string(info.State)),
zap.String("admin job type", info.AdminJobType.String()),
zap.String("new state", string(state)))
info.State = state
}
}

// CheckErrorHistory checks error history of a changefeed
// if having error record older than GC interval, set needSave to true.
// if error counts reach threshold, set canInit to false.
Expand Down
149 changes: 147 additions & 2 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *configSuite) TestFillV1(c *check.C) {
})
}

func (s *configSuite) TestVerifyAndFix(c *check.C) {
func (s *configSuite) TestVerifyAndComplete(c *check.C) {
defer testleak.AfterTest(c)()
info := &ChangeFeedInfo{
SinkURI: "blackhole://",
Expand All @@ -178,7 +178,7 @@ func (s *configSuite) TestVerifyAndFix(c *check.C) {
},
}

err := info.VerifyAndFix()
err := info.VerifyAndComplete()
c.Assert(err, check.IsNil)
c.Assert(info.Engine, check.Equals, SortUnified)

Expand Down Expand Up @@ -358,3 +358,148 @@ func (s *changefeedSuite) TestGetTs(c *check.C) {
status := &ChangeFeedStatus{CheckpointTs: checkpointTs}
c.Assert(info.GetCheckpointTs(status), check.Equals, checkpointTs)
}

func (s *changefeedSuite) TestFixIncompatible(c *check.C) {
defer testleak.AfterTest(c)()
// Test to fix incompatible states.
testCases := []struct {
info *ChangeFeedInfo
expectedState FeedState
}{
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "",
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "4.0.14",
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
CreatorVersion: "5.0.5",
},
expectedState: StateStopped,
},
}

for _, tc := range testCases {
tc.info.FixIncompatible()
c.Assert(tc.info.State, check.Equals, tc.expectedState)
}
}

func (s *changefeedSuite) TestFixState(c *check.C) {
defer testleak.AfterTest(c)()

testCases := []struct {
info *ChangeFeedInfo
expectedState FeedState
}{
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: nil,
},
expectedState: StateNormal,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: nil,
},
expectedState: StateNormal,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrGCTTLExceeded.RFCCode()),
},
},
expectedState: StateFailed,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrGCTTLExceeded.RFCCode()),
},
},
expectedState: StateFailed,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminNone,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrUnknownSortEngine.RFCCode()),
},
},
expectedState: StateError,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminResume,
State: StateNormal,
Error: &RunningError{
Code: string(cerror.ErrUnknownSortEngine.RFCCode()),
},
},
expectedState: StateError,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminStop,
State: StateNormal,
Error: nil,
},
expectedState: StateStopped,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminFinish,
State: StateNormal,
Error: nil,
},
expectedState: StateFinished,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminRemove,
State: StateNormal,
Error: nil,
},
expectedState: StateRemoved,
},
{
info: &ChangeFeedInfo{
AdminJobType: AdminRemove,
State: StateNormal,
Error: nil,
},
expectedState: StateRemoved,
},
}

for _, tc := range testCases {
tc.info.fixState()
c.Assert(tc.info.State, check.Equals, tc.expectedState)
}
}
2 changes: 1 addition & 1 deletion cdc/model/reactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *ChangefeedReactorState) UpdateCDCKey(key *etcd.CDCKey, value []byte) er
return errors.Trace(err)
}
if key.Tp == etcd.CDCKeyTypeChangefeedInfo {
if err := s.Info.VerifyAndFix(); err != nil {
if err := s.Info.VerifyAndComplete(); err != nil {
return errors.Trace(err)
}
}
Expand Down
Loading

0 comments on commit e572735

Please sign in to comment.