Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeed (ticdc): remove resolvedTs from etcd #9194

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions cdc/api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,38 @@ type testCase struct {
method string
}

func (p *mockStatusProvider) GetAllChangeFeedStatuses(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedStatus, error) {
func (p *mockStatusProvider) GetAllChangeFeedStatuses(ctx context.Context) (
map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI, error,
) {
args := p.Called(ctx)
return args.Get(0).(map[model.ChangeFeedID]*model.ChangeFeedStatus), args.Error(1)
return args.Get(0).(map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI), args.Error(1)
}

func (p *mockStatusProvider) GetChangeFeedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedStatus, error) {
func (p *mockStatusProvider) GetChangeFeedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (
*model.ChangeFeedStatusForAPI, error,
) {
args := p.Called(ctx, changefeedID)
log.Info("err", zap.Error(args.Error(1)))
return args.Get(0).(*model.ChangeFeedStatus), args.Error(1)
return args.Get(0).(*model.ChangeFeedStatusForAPI), args.Error(1)
}

func (p *mockStatusProvider) GetAllChangeFeedInfo(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedInfo, error) {
func (p *mockStatusProvider) GetAllChangeFeedInfo(ctx context.Context) (
map[model.ChangeFeedID]*model.ChangeFeedInfo, error,
) {
args := p.Called(ctx)
return args.Get(0).(map[model.ChangeFeedID]*model.ChangeFeedInfo), args.Error(1)
}

func (p *mockStatusProvider) GetChangeFeedInfo(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedInfo, error) {
func (p *mockStatusProvider) GetChangeFeedInfo(ctx context.Context, changefeedID model.ChangeFeedID) (
*model.ChangeFeedInfo, error,
) {
args := p.Called(ctx)
return args.Get(0).(*model.ChangeFeedInfo), args.Error(1)
}

func (p *mockStatusProvider) GetAllTaskStatuses(ctx context.Context, changefeedID model.ChangeFeedID) (map[model.CaptureID]*model.TaskStatus, error) {
func (p *mockStatusProvider) GetAllTaskStatuses(ctx context.Context, changefeedID model.ChangeFeedID) (
map[model.CaptureID]*model.TaskStatus, error,
) {
args := p.Called(ctx)
return args.Get(0).(map[model.CaptureID]*model.TaskStatus), args.Error(1)
}
Expand Down Expand Up @@ -103,17 +113,17 @@ func newRouter(c capture.Capture, p owner.StatusProvider) *gin.Engine {
func newStatusProvider() *mockStatusProvider {
statusProvider := &mockStatusProvider{}
statusProvider.On("GetChangeFeedStatus", mock.Anything, changeFeedID).
Return(&model.ChangeFeedStatus{CheckpointTs: 1}, nil)
Return(&model.ChangeFeedStatusForAPI{CheckpointTs: 1}, nil)

statusProvider.On("GetChangeFeedStatus", mock.Anything, nonExistChangefeedID).
Return(new(model.ChangeFeedStatus),
Return(new(model.ChangeFeedStatusForAPI),
cerror.ErrChangeFeedNotExists.GenWithStackByArgs(nonExistChangefeedID))

statusProvider.On("GetAllTaskStatuses", mock.Anything).
Return(map[model.CaptureID]*model.TaskStatus{captureID: {}}, nil)

statusProvider.On("GetAllChangeFeedStatuses", mock.Anything).
Return(map[model.ChangeFeedID]*model.ChangeFeedStatus{
Return(map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI{
model.ChangeFeedID4Test("ab", "123"): {CheckpointTs: 1},
model.ChangeFeedID4Test("ab", "13"): {CheckpointTs: 2},
model.ChangeFeedID4Test("abc", "123"): {CheckpointTs: 1},
Expand Down Expand Up @@ -352,9 +362,9 @@ func TestRemoveChangefeed(t *testing.T) {

statusProvider := &mockStatusProvider{}
statusProvider.On("GetChangeFeedStatus", mock.Anything, changeFeedID).
Return(&model.ChangeFeedStatus{CheckpointTs: 1}, nil).Once()
Return(&model.ChangeFeedStatusForAPI{CheckpointTs: 1}, nil).Once()
statusProvider.On("GetChangeFeedStatus", mock.Anything, changeFeedID).
Return(new(model.ChangeFeedStatus),
Return(new(model.ChangeFeedStatusForAPI),
cerror.ErrChangeFeedNotExists.FastGenByArgs(changeFeedID)).Once()

router1 := newRouter(cp, statusProvider)
Expand Down
2 changes: 1 addition & 1 deletion cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
cfg.ID = ""
cfg.Namespace = ""
// changefeed already exists
provider.changefeedStatus = &model.ChangeFeedStatus{}
provider.changefeedStatus = &model.ChangeFeedStatusForAPI{}
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
provider.changefeedStatus = nil
Expand Down
8 changes: 4 additions & 4 deletions cdc/api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,19 @@ func (c *mockPDClient) Close() {}

type mockStatusProvider struct {
owner.StatusProvider
changefeedStatus *model.ChangeFeedStatus
changefeedStatus *model.ChangeFeedStatusForAPI
changefeedInfo *model.ChangeFeedInfo
processors []*model.ProcInfoSnap
taskStatus map[model.CaptureID]*model.TaskStatus
changefeedInfos map[model.ChangeFeedID]*model.ChangeFeedInfo
changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatus
changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI
err error
}

// GetChangeFeedStatus returns a changefeeds' runtime status.
func (m *mockStatusProvider) GetChangeFeedStatus(ctx context.Context,
changefeedID model.ChangeFeedID,
) (*model.ChangeFeedStatus, error) {
) (*model.ChangeFeedStatusForAPI, error) {
return m.changefeedStatus, m.err
}

Expand Down Expand Up @@ -114,7 +114,7 @@ func (m *mockStatusProvider) GetAllChangeFeedInfo(_ context.Context) (

// GetAllChangeFeedStatuses returns a list of mock changefeed status.
func (m *mockStatusProvider) GetAllChangeFeedStatuses(_ context.Context) (
map[model.ChangeFeedID]*model.ChangeFeedStatus,
map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI,
error,
) {
return m.changefeedStatuses, m.err
Expand Down
10 changes: 5 additions & 5 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestGetChangeFeed(t *testing.T) {
Code: string(cerrors.ErrStartTsBeforeGC.RFCCode()),
},
}
statusProvider.changefeedStatus = &model.ChangeFeedStatus{
statusProvider.changefeedStatus = &model.ChangeFeedStatusForAPI{
CheckpointTs: 1,
}
w = httptest.NewRecorder()
Expand Down Expand Up @@ -451,7 +451,7 @@ func TestUpdateChangefeed(t *testing.T) {
Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, cerrors.ErrChangefeedUpdateRefused).
Times(1)

statusProvider.changefeedStatus = &model.ChangeFeedStatus{
statusProvider.changefeedStatus = &model.ChangeFeedStatusForAPI{
CheckpointTs: 1,
}
w = httptest.NewRecorder()
Expand Down Expand Up @@ -547,7 +547,7 @@ func TestListChangeFeeds(t *testing.T) {
State: model.StateStopped,
},
},
changefeedStatuses: map[model.ChangeFeedID]*model.ChangeFeedStatus{
changefeedStatuses: map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI{
model.DefaultChangeFeedID("cf1"): {},
model.DefaultChangeFeedID("cf2"): {},
model.DefaultChangeFeedID("cf3"): {},
Expand Down Expand Up @@ -853,7 +853,7 @@ func TestDeleteChangefeed(t *testing.T) {

// case 4: remove changefeed
statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).Return(
&model.ChangeFeedStatus{}, nil)
&model.ChangeFeedStatusForAPI{}, nil)
statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).Return(
nil, cerrors.ErrChangeFeedNotExists.GenWithStackByArgs(validID))
w = httptest.NewRecorder()
Expand All @@ -864,7 +864,7 @@ func TestDeleteChangefeed(t *testing.T) {

// case 5: remove changefeed failed
statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).AnyTimes().Return(
&model.ChangeFeedStatus{}, nil)
&model.ChangeFeedStatusForAPI{}, nil)
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), remove.method,
fmt.Sprintf(remove.url, validID), nil)
Expand Down
10 changes: 10 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,3 +584,13 @@ func (t DownstreamType) String() string {
}
return "Unknown"
}

// ChangeFeedStatusForAPI uses to transfer the status of changefeed for API.
type ChangeFeedStatusForAPI struct {
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
ResolvedTs uint64 `json:"resolved-ts"`
CheckpointTs uint64 `json:"checkpoint-ts"`
// minTableBarrierTs is the minimum commitTs of all DDL events and is only
// used to check whether there is a pending DDL job at the checkpointTs when
// initializing the changefeed.
MinTableBarrierTs uint64 `json:"min-table-barrier-ts"`
}
8 changes: 5 additions & 3 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,16 @@ func (p ProcessorsInfos) String() string {
}

// ChangeFeedStatus stores information about a ChangeFeed
// It is stored in etcd.
type ChangeFeedStatus struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some comments to explain that AdminJobType should be removed in future.

ResolvedTs uint64 `json:"resolved-ts"`
CheckpointTs uint64 `json:"checkpoint-ts"`
// minTableBarrierTs is the minimum commitTs of all DDL events and is only
// used to check whether there is a pending DDL job at the checkpointTs when
// initializing the changefeed.
MinTableBarrierTs uint64 `json:"min-table-barrier-ts"`
AdminJobType AdminJobType `json:"admin-job-type"`
MinTableBarrierTs uint64 `json:"min-table-barrier-ts"`
// TODO: remove this filed after we don't use ChangeFeedStatus to
// control processor. This is too ambiguous.
AdminJobType AdminJobType `json:"admin-job-type"`
}

// Marshal returns json encoded string of ChangeFeedStatus, only contains necessary fields stored in storage
Expand Down
3 changes: 1 addition & 2 deletions cdc/model/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,9 @@ func TestChangeFeedStatusMarshal(t *testing.T) {
t.Parallel()

status := &ChangeFeedStatus{
ResolvedTs: 420875942036766723,
CheckpointTs: 420875940070686721,
}
expected := `{"resolved-ts":420875942036766723,"checkpoint-ts":420875940070686721,
expected := `{"checkpoint-ts":420875940070686721,
"min-table-barrier-ts":0,"admin-job-type":0}`
expected = strings.ReplaceAll(expected, "\n", "")

Expand Down
40 changes: 16 additions & 24 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type changefeed struct {
// and will be destroyed when a changefeed is closed.
barriers *barriers
feedStateManager *feedStateManager
resolvedTs model.Ts

// ddl related fields
ddlManager *ddlManager
Expand Down Expand Up @@ -359,7 +360,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Uint64("preCheckpointTs", preCheckpointTs),
zap.Uint64("preResolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("preResolvedTs", c.resolvedTs),
zap.Uint64("globalBarrierTs", barrier.GlobalBarrierTs),
zap.Uint64("minTableBarrierTs", barrier.MinTableBarrierTs),
zap.Any("tableBarrier", barrier.TableBarriers))
Expand Down Expand Up @@ -390,22 +391,20 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
if c.state.Status != nil {
// We should keep the metrics updated even if the scheduler cannot
// advance the watermarks for now.
c.updateMetrics(currentTs, c.state.Status.CheckpointTs, c.state.Status.ResolvedTs)
c.updateMetrics(currentTs, c.state.Status.CheckpointTs, c.resolvedTs)
}
return nil
}

prevResolvedTs := c.state.Status.ResolvedTs
log.Debug("owner prepares to update status",
zap.Uint64("prevResolvedTs", prevResolvedTs),
zap.Uint64("prevResolvedTs", c.resolvedTs),
zap.Uint64("newResolvedTs", newResolvedTs),
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID))
// resolvedTs should never regress but checkpointTs can, as checkpointTs has already
// been decreased when the owner is initialized.
if newResolvedTs < prevResolvedTs {
newResolvedTs = prevResolvedTs
// resolvedTs should never regress.
if newResolvedTs > c.resolvedTs {
c.resolvedTs = newResolvedTs
}

// MinTableBarrierTs should never regress
Expand All @@ -424,8 +423,8 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
}
})

c.updateStatus(newCheckpointTs, newResolvedTs, barrier.MinTableBarrierTs)
c.updateMetrics(currentTs, newCheckpointTs, newResolvedTs)
c.updateStatus(newCheckpointTs, barrier.MinTableBarrierTs)
c.updateMetrics(currentTs, newCheckpointTs, c.resolvedTs)
c.tickDownstreamObserver(ctx)

return nil
Expand Down Expand Up @@ -462,7 +461,10 @@ LOOP2:
}

checkpointTs := c.state.Status.CheckpointTs
resolvedTs := c.state.Status.ResolvedTs
if c.resolvedTs == 0 {
c.resolvedTs = checkpointTs
}

minTableBarrierTs := c.state.Status.MinTableBarrierTs

failpoint.Inject("NewChangefeedNoRetryError", func() {
Expand Down Expand Up @@ -527,7 +529,7 @@ LOOP2:

c.barriers = newBarriers()
if util.GetOrZero(c.state.Info.Config.EnableSyncPoint) {
c.barriers.Update(syncPointBarrier, resolvedTs)
c.barriers.Update(syncPointBarrier, c.resolvedTs)
}
c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs())

Expand Down Expand Up @@ -656,7 +658,7 @@ LOOP2:
zap.String("changefeed", c.state.ID.ID),
zap.Uint64("changefeedEpoch", epoch),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("resolvedTs", c.resolvedTs),
zap.Stringer("info", c.state.Info))

return nil
Expand Down Expand Up @@ -833,7 +835,6 @@ func (c *changefeed) preflightCheck(captures map[model.CaptureID]*model.CaptureI
if status == nil {
status = &model.ChangeFeedStatus{
// changefeed status is nil when the changefeed has just created.
ResolvedTs: c.state.Info.StartTs,
CheckpointTs: c.state.Info.StartTs,
MinTableBarrierTs: c.state.Info.StartTs,
AdminJobType: model.AdminNone,
Expand Down Expand Up @@ -946,22 +947,13 @@ func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs mod
c.metricsCurrentPDTsGauge.Set(float64(currentTs))
}

func (c *changefeed) updateStatus(checkpointTs, resolvedTs, minTableBarrierTs model.Ts) {
if checkpointTs > resolvedTs {
log.Panic("checkpointTs is greater than resolvedTs",
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", resolvedTs))
}
func (c *changefeed) updateStatus(checkpointTs, minTableBarrierTs model.Ts) {
c.state.PatchStatus(
func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
changed := false
if status == nil {
return nil, changed, nil
}
if status.ResolvedTs != resolvedTs {
status.ResolvedTs = resolvedTs
changed = true
}
if status.CheckpointTs != checkpointTs {
status.CheckpointTs = checkpointTs
changed = true
Expand Down
3 changes: 1 addition & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,6 @@ func TestBarrierAdvance(t *testing.T) {

// The changefeed load the info from etcd.
cf.state.Status = &model.ChangeFeedStatus{
ResolvedTs: cf.state.Info.StartTs + 10,
CheckpointTs: cf.state.Info.StartTs,
MinTableBarrierTs: cf.state.Info.StartTs + 5,
}
Expand All @@ -631,7 +630,7 @@ func TestBarrierAdvance(t *testing.T) {
}

// Suppose tableCheckpoint has been advanced.
cf.state.Status.CheckpointTs = cf.state.Status.ResolvedTs
cf.state.Status.CheckpointTs += 10

// Need more 1 tick to advance barrier if sync-point is enabled.
if i == 1 {
Expand Down
1 change: 0 additions & 1 deletion cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
if job.OverwriteCheckpointTs > 0 {
oldCheckpointTs := status.CheckpointTs
status = &model.ChangeFeedStatus{
ResolvedTs: job.OverwriteCheckpointTs,
CheckpointTs: job.OverwriteCheckpointTs,
MinTableBarrierTs: job.OverwriteCheckpointTs,
AdminJobType: model.AdminNone,
Expand Down
8 changes: 4 additions & 4 deletions cdc/owner/mock/status_provider_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading