Skip to content

Commit

Permalink
changefeed (ticdc): remove resolvedTs from etcd (pingcap#9194)
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen committed Jul 3, 2023
1 parent 811d9bf commit 1b9dbd5
Show file tree
Hide file tree
Showing 17 changed files with 114 additions and 101 deletions.
34 changes: 22 additions & 12 deletions cdc/api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,38 @@ type testCase struct {
method string
}

func (p *mockStatusProvider) GetAllChangeFeedStatuses(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedStatus, error) {
func (p *mockStatusProvider) GetAllChangeFeedStatuses(ctx context.Context) (
map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI, error,
) {
args := p.Called(ctx)
return args.Get(0).(map[model.ChangeFeedID]*model.ChangeFeedStatus), args.Error(1)
return args.Get(0).(map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI), args.Error(1)
}

func (p *mockStatusProvider) GetChangeFeedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedStatus, error) {
func (p *mockStatusProvider) GetChangeFeedStatus(ctx context.Context, changefeedID model.ChangeFeedID) (
*model.ChangeFeedStatusForAPI, error,
) {
args := p.Called(ctx, changefeedID)
log.Info("err", zap.Error(args.Error(1)))
return args.Get(0).(*model.ChangeFeedStatus), args.Error(1)
return args.Get(0).(*model.ChangeFeedStatusForAPI), args.Error(1)
}

func (p *mockStatusProvider) GetAllChangeFeedInfo(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedInfo, error) {
func (p *mockStatusProvider) GetAllChangeFeedInfo(ctx context.Context) (
map[model.ChangeFeedID]*model.ChangeFeedInfo, error,
) {
args := p.Called(ctx)
return args.Get(0).(map[model.ChangeFeedID]*model.ChangeFeedInfo), args.Error(1)
}

func (p *mockStatusProvider) GetChangeFeedInfo(ctx context.Context, changefeedID model.ChangeFeedID) (*model.ChangeFeedInfo, error) {
func (p *mockStatusProvider) GetChangeFeedInfo(ctx context.Context, changefeedID model.ChangeFeedID) (
*model.ChangeFeedInfo, error,
) {
args := p.Called(ctx)
return args.Get(0).(*model.ChangeFeedInfo), args.Error(1)
}

func (p *mockStatusProvider) GetAllTaskStatuses(ctx context.Context, changefeedID model.ChangeFeedID) (map[model.CaptureID]*model.TaskStatus, error) {
func (p *mockStatusProvider) GetAllTaskStatuses(ctx context.Context, changefeedID model.ChangeFeedID) (
map[model.CaptureID]*model.TaskStatus, error,
) {
args := p.Called(ctx)
return args.Get(0).(map[model.CaptureID]*model.TaskStatus), args.Error(1)
}
Expand Down Expand Up @@ -103,17 +113,17 @@ func newRouter(c capture.Capture, p owner.StatusProvider) *gin.Engine {
func newStatusProvider() *mockStatusProvider {
statusProvider := &mockStatusProvider{}
statusProvider.On("GetChangeFeedStatus", mock.Anything, changeFeedID).
Return(&model.ChangeFeedStatus{CheckpointTs: 1}, nil)
Return(&model.ChangeFeedStatusForAPI{CheckpointTs: 1}, nil)

statusProvider.On("GetChangeFeedStatus", mock.Anything, nonExistChangefeedID).
Return(new(model.ChangeFeedStatus),
Return(new(model.ChangeFeedStatusForAPI),
cerror.ErrChangeFeedNotExists.GenWithStackByArgs(nonExistChangefeedID))

statusProvider.On("GetAllTaskStatuses", mock.Anything).
Return(map[model.CaptureID]*model.TaskStatus{captureID: {}}, nil)

statusProvider.On("GetAllChangeFeedStatuses", mock.Anything).
Return(map[model.ChangeFeedID]*model.ChangeFeedStatus{
Return(map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI{
model.ChangeFeedID4Test("ab", "123"): {CheckpointTs: 1},
model.ChangeFeedID4Test("ab", "13"): {CheckpointTs: 2},
model.ChangeFeedID4Test("abc", "123"): {CheckpointTs: 1},
Expand Down Expand Up @@ -352,9 +362,9 @@ func TestRemoveChangefeed(t *testing.T) {

statusProvider := &mockStatusProvider{}
statusProvider.On("GetChangeFeedStatus", mock.Anything, changeFeedID).
Return(&model.ChangeFeedStatus{CheckpointTs: 1}, nil).Once()
Return(&model.ChangeFeedStatusForAPI{CheckpointTs: 1}, nil).Once()
statusProvider.On("GetChangeFeedStatus", mock.Anything, changeFeedID).
Return(new(model.ChangeFeedStatus),
Return(new(model.ChangeFeedStatusForAPI),
cerror.ErrChangeFeedNotExists.FastGenByArgs(changeFeedID)).Once()

router1 := newRouter(cp, statusProvider)
Expand Down
2 changes: 1 addition & 1 deletion cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
cfg.ID = ""
cfg.Namespace = ""
// changefeed already exists
provider.changefeedStatus = &model.ChangeFeedStatus{}
provider.changefeedStatus = &model.ChangeFeedStatusForAPI{}
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, provider, "en", storage)
require.NotNil(t, err)
provider.changefeedStatus = nil
Expand Down
8 changes: 4 additions & 4 deletions cdc/api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,19 @@ func (c *mockPDClient) Close() {}

type mockStatusProvider struct {
owner.StatusProvider
changefeedStatus *model.ChangeFeedStatus
changefeedStatus *model.ChangeFeedStatusForAPI
changefeedInfo *model.ChangeFeedInfo
processors []*model.ProcInfoSnap
taskStatus map[model.CaptureID]*model.TaskStatus
changefeedInfos map[model.ChangeFeedID]*model.ChangeFeedInfo
changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatus
changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI
err error
}

// GetChangeFeedStatus returns a changefeeds' runtime status.
func (m *mockStatusProvider) GetChangeFeedStatus(ctx context.Context,
changefeedID model.ChangeFeedID,
) (*model.ChangeFeedStatus, error) {
) (*model.ChangeFeedStatusForAPI, error) {
return m.changefeedStatus, m.err
}

Expand Down Expand Up @@ -114,7 +114,7 @@ func (m *mockStatusProvider) GetAllChangeFeedInfo(_ context.Context) (

// GetAllChangeFeedStatuses returns a list of mock changefeed status.
func (m *mockStatusProvider) GetAllChangeFeedStatuses(_ context.Context) (
map[model.ChangeFeedID]*model.ChangeFeedStatus,
map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI,
error,
) {
return m.changefeedStatuses, m.err
Expand Down
10 changes: 5 additions & 5 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func TestGetChangeFeed(t *testing.T) {
Code: string(cerrors.ErrStartTsBeforeGC.RFCCode()),
},
}
statusProvider.changefeedStatus = &model.ChangeFeedStatus{
statusProvider.changefeedStatus = &model.ChangeFeedStatusForAPI{
CheckpointTs: 1,
}
w = httptest.NewRecorder()
Expand Down Expand Up @@ -442,7 +442,7 @@ func TestUpdateChangefeed(t *testing.T) {
Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, cerrors.ErrChangefeedUpdateRefused).
Times(1)

statusProvider.changefeedStatus = &model.ChangeFeedStatus{
statusProvider.changefeedStatus = &model.ChangeFeedStatusForAPI{
CheckpointTs: 1,
}
w = httptest.NewRecorder()
Expand Down Expand Up @@ -538,7 +538,7 @@ func TestListChangeFeeds(t *testing.T) {
State: model.StateStopped,
},
},
changefeedStatuses: map[model.ChangeFeedID]*model.ChangeFeedStatus{
changefeedStatuses: map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI{
model.DefaultChangeFeedID("cf1"): {},
model.DefaultChangeFeedID("cf2"): {},
model.DefaultChangeFeedID("cf3"): {},
Expand Down Expand Up @@ -844,7 +844,7 @@ func TestDeleteChangefeed(t *testing.T) {

// case 4: remove changefeed
statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).Return(
&model.ChangeFeedStatus{}, nil)
&model.ChangeFeedStatusForAPI{}, nil)
statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).Return(
nil, cerrors.ErrChangeFeedNotExists.GenWithStackByArgs(validID))
w = httptest.NewRecorder()
Expand All @@ -855,7 +855,7 @@ func TestDeleteChangefeed(t *testing.T) {

// case 5: remove changefeed failed
statusProvider.EXPECT().GetChangeFeedStatus(gomock.Any(), gomock.Any()).AnyTimes().Return(
&model.ChangeFeedStatus{}, nil)
&model.ChangeFeedStatusForAPI{}, nil)
w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), remove.method,
fmt.Sprintf(remove.url, validID), nil)
Expand Down
10 changes: 10 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,3 +494,13 @@ func (t DownstreamType) String() string {
}
return "Unknown"
}

// ChangeFeedStatusForAPI uses to transfer the status of changefeed for API.
type ChangeFeedStatusForAPI struct {
ResolvedTs uint64 `json:"resolved-ts"`
CheckpointTs uint64 `json:"checkpoint-ts"`
// minTableBarrierTs is the minimum commitTs of all DDL events and is only
// used to check whether there is a pending DDL job at the checkpointTs when
// initializing the changefeed.
MinTableBarrierTs uint64 `json:"min-table-barrier-ts"`
}
8 changes: 5 additions & 3 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,16 @@ func (p ProcessorsInfos) String() string {
}

// ChangeFeedStatus stores information about a ChangeFeed
// It is stored in etcd.
type ChangeFeedStatus struct {
ResolvedTs uint64 `json:"resolved-ts"`
CheckpointTs uint64 `json:"checkpoint-ts"`
// minTableBarrierTs is the minimum commitTs of all DDL events and is only
// used to check whether there is a pending DDL job at the checkpointTs when
// initializing the changefeed.
MinTableBarrierTs uint64 `json:"min-table-barrier-ts"`
AdminJobType AdminJobType `json:"admin-job-type"`
MinTableBarrierTs uint64 `json:"min-table-barrier-ts"`
// TODO: remove this filed after we don't use ChangeFeedStatus to
// control processor. This is too ambiguous.
AdminJobType AdminJobType `json:"admin-job-type"`
}

// Marshal returns json encoded string of ChangeFeedStatus, only contains necessary fields stored in storage
Expand Down
3 changes: 1 addition & 2 deletions cdc/model/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,9 @@ func TestChangeFeedStatusMarshal(t *testing.T) {
t.Parallel()

status := &ChangeFeedStatus{
ResolvedTs: 420875942036766723,
CheckpointTs: 420875940070686721,
}
expected := `{"resolved-ts":420875942036766723,"checkpoint-ts":420875940070686721,
expected := `{"checkpoint-ts":420875940070686721,
"min-table-barrier-ts":0,"admin-job-type":0}`
expected = strings.ReplaceAll(expected, "\n", "")

Expand Down
44 changes: 17 additions & 27 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type changefeed struct {
// and will be destroyed when a changefeed is closed.
barriers *barriers
feedStateManager *feedStateManager
resolvedTs model.Ts

// ddl related fields
ddlManager *ddlManager
Expand Down Expand Up @@ -356,7 +357,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Uint64("preCheckpointTs", preCheckpointTs),
zap.Uint64("preResolvedTs", c.state.Status.ResolvedTs),
zap.Uint64("preResolvedTs", c.resolvedTs),
zap.Uint64("globalBarrierTs", barrier.GlobalBarrierTs),
zap.Uint64("minTableBarrierTs", barrier.MinTableBarrierTs),
zap.Any("tableBarrier", barrier.TableBarriers))
Expand All @@ -383,23 +384,20 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
if c.state.Status != nil {
// We should keep the metrics updated even if the scheduler cannot
// advance the watermarks for now.
c.updateMetrics(currentTs, c.state.Status.CheckpointTs, c.state.Status.ResolvedTs)
c.updateMetrics(currentTs, c.state.Status.CheckpointTs, c.resolvedTs)
}
return nil
}

prevResolvedTs := c.state.Status.ResolvedTs

log.Debug("owner prepares to update status",
zap.Uint64("prevResolvedTs", prevResolvedTs),
zap.Uint64("prevResolvedTs", c.resolvedTs),
zap.Uint64("newResolvedTs", newResolvedTs),
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID))
// resolvedTs should never regress but checkpointTs can, as checkpointTs has already
// been decreased when the owner is initialized.
if newResolvedTs < prevResolvedTs {
newResolvedTs = prevResolvedTs
// resolvedTs should never regress.
if newResolvedTs > c.resolvedTs {
c.resolvedTs = newResolvedTs
}

// MinTableBarrierTs should never regress
Expand All @@ -418,8 +416,8 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
}
})

c.updateStatus(newCheckpointTs, newResolvedTs, barrier.MinTableBarrierTs)
c.updateMetrics(currentTs, newCheckpointTs, newResolvedTs)
c.updateStatus(newCheckpointTs, barrier.MinTableBarrierTs)
c.updateMetrics(currentTs, newCheckpointTs, c.resolvedTs)
c.tickDownstreamObserver(ctx)

return nil
Expand Down Expand Up @@ -456,7 +454,10 @@ LOOP2:
}

checkpointTs := c.state.Status.CheckpointTs
resolvedTs := c.state.Status.ResolvedTs
if c.resolvedTs == 0 {
c.resolvedTs = checkpointTs
}

minTableBarrierTs := c.state.Status.MinTableBarrierTs

failpoint.Inject("NewChangefeedNoRetryError", func() {
Expand Down Expand Up @@ -520,9 +521,8 @@ LOOP2:
}

c.barriers = newBarriers()
if c.state.Info.Config.EnableSyncPoint { // preResolvedTs model.Ts

c.barriers.Update(syncPointBarrier, resolvedTs)
if c.state.Info.Config.EnableSyncPoint {
c.barriers.Update(syncPointBarrier, c.resolvedTs)
}
c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs())

Expand Down Expand Up @@ -649,7 +649,7 @@ LOOP2:
zap.String("changefeed", c.state.ID.ID),
zap.Uint64("changefeedEpoch", epoch),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("resolvedTs", c.resolvedTs),
zap.Stringer("info", c.state.Info))

return nil
Expand Down Expand Up @@ -826,7 +826,6 @@ func (c *changefeed) preflightCheck(captures map[model.CaptureID]*model.CaptureI
if status == nil {
status = &model.ChangeFeedStatus{
// changefeed status is nil when the changefeed has just created.
ResolvedTs: c.state.Info.StartTs,
CheckpointTs: c.state.Info.StartTs,
MinTableBarrierTs: c.state.Info.StartTs,
AdminJobType: model.AdminNone,
Expand Down Expand Up @@ -937,22 +936,13 @@ func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs mod
c.metricsCurrentPDTsGauge.Set(float64(currentTs))
}

func (c *changefeed) updateStatus(checkpointTs, resolvedTs, minTableBarrierTs model.Ts) {
if checkpointTs > resolvedTs {
log.Panic("checkpointTs is greater than resolvedTs",
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", resolvedTs))
}
func (c *changefeed) updateStatus(checkpointTs, minTableBarrierTs model.Ts) {
c.state.PatchStatus(
func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
changed := false
if status == nil {
return nil, changed, nil
}
if status.ResolvedTs != resolvedTs {
status.ResolvedTs = resolvedTs
changed = true
}
if status.CheckpointTs != checkpointTs {
status.CheckpointTs = checkpointTs
changed = true
Expand Down
3 changes: 1 addition & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,6 @@ func TestBarrierAdvance(t *testing.T) {

// The changefeed load the info from etcd.
cf.state.Status = &model.ChangeFeedStatus{
ResolvedTs: cf.state.Info.StartTs + 10,
CheckpointTs: cf.state.Info.StartTs,
MinTableBarrierTs: cf.state.Info.StartTs + 5,
}
Expand All @@ -623,7 +622,7 @@ func TestBarrierAdvance(t *testing.T) {
}

// Suppose tableCheckpoint has been advanced.
cf.state.Status.CheckpointTs = cf.state.Status.ResolvedTs
cf.state.Status.CheckpointTs += 10

// Need more 1 tick to advance barrier if sync-point is enabled.
if i == 1 {
Expand Down
1 change: 0 additions & 1 deletion cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
if job.OverwriteCheckpointTs > 0 {
oldCheckpointTs := status.CheckpointTs
status = &model.ChangeFeedStatus{
ResolvedTs: job.OverwriteCheckpointTs,
CheckpointTs: job.OverwriteCheckpointTs,
MinTableBarrierTs: job.OverwriteCheckpointTs,
AdminJobType: model.AdminNone,
Expand Down
8 changes: 4 additions & 4 deletions cdc/owner/mock/status_provider_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1b9dbd5

Please sign in to comment.