Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler,processor(ticdc): update capture liveness by heartbeat #6613

Merged
merged 3 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cdc/capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,8 @@ func TestCampaignLiveness(t *testing.T) {
require.Nil(t, err)
require.False(t, me.campaignFlag)

cp.liveness.Store(model.LivenessCaptureAlive)
// Force set alive.
cp.liveness = model.LivenessCaptureAlive
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
Expand Down
8 changes: 5 additions & 3 deletions cdc/model/http_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func NewHTTPError(err error) HTTPError {
}

// Liveness is the liveness status of a capture.
// Liveness can only be changed from alive to stopping, and no way back.
type Liveness int32

const (
Expand All @@ -70,9 +71,10 @@ const (
LivenessCaptureStopping Liveness = 1
)

// Store the given liveness.
func (l *Liveness) Store(v Liveness) {
atomic.StoreInt32((*int32)(l), int32(v))
// Store the given liveness. Returns true if it success.
func (l *Liveness) Store(v Liveness) bool {
return atomic.CompareAndSwapInt32(
(*int32)(l), int32(LivenessCaptureAlive), int32(v))
}

// Load the liveness.
Expand Down
19 changes: 19 additions & 0 deletions cdc/model/http_model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
)

func TestChangefeedCommonInfoMarshalJSON(t *testing.T) {
t.Parallel()

runningErr := &RunningError{
"",
string(errors.ErrProcessorUnknown.RFCCode()),
Expand All @@ -45,6 +47,8 @@ func TestChangefeedCommonInfoMarshalJSON(t *testing.T) {
}

func TestChangefeedDetailMarshalJSON(t *testing.T) {
t.Parallel()

runningErr := &RunningError{
"",
string(errors.ErrProcessorUnknown.RFCCode()),
Expand All @@ -66,3 +70,18 @@ func TestChangefeedDetailMarshalJSON(t *testing.T) {
require.Nil(t, err)
require.Contains(t, string(cfInfoJSON), string(errors.ErrProcessorUnknown.RFCCode()))
}

func TestLiveness(t *testing.T) {
t.Parallel()

liveness := LivenessCaptureAlive

require.True(t, liveness.Store(LivenessCaptureAlive))
require.Equal(t, LivenessCaptureAlive, liveness.Load())

require.True(t, liveness.Store(LivenessCaptureStopping))
require.Equal(t, LivenessCaptureStopping, liveness.Load())

require.False(t, liveness.Store(LivenessCaptureAlive))
require.Equal(t, LivenessCaptureStopping, liveness.Load())
}
16 changes: 10 additions & 6 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type processor struct {

lazyInit func(ctx cdcContext.Context) error
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (pipeline.TablePipeline, error)
newAgent func(ctx cdcContext.Context) (scheduler.Agent, error)
newAgent func(cdcContext.Context, *model.Liveness) (scheduler.Agent, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you consider making an AgentFactory interface? So that we can put whatever extra arguments we need into the implementation of an AgentFactory, so that further changes will not affect how the interface method NewAgent is called.

Just a suggestion. If you think this part of code is not subject to anymore significant changes, I think the current way is fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it will change anytime soon, let's add the factory when necessary.


liveness *model.Liveness
agent scheduler.Agent
Expand Down Expand Up @@ -576,7 +576,7 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR

p.doGCSchemaStorage()

if err := p.agent.Tick(ctx, p.liveness.Load()); err != nil {
if err := p.agent.Tick(ctx); err != nil {
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -692,7 +692,7 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
return err
}

p.agent, err = p.newAgent(ctx)
p.agent, err = p.newAgent(ctx, p.liveness)
if err != nil {
return err
}
Expand All @@ -705,18 +705,22 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
return nil
}

func (p *processor) newAgentImpl(ctx cdcContext.Context) (ret scheduler.Agent, err error) {
func (p *processor) newAgentImpl(
ctx cdcContext.Context, liveness *model.Liveness,
) (ret scheduler.Agent, err error) {
messageServer := ctx.GlobalVars().MessageServer
messageRouter := ctx.GlobalVars().MessageRouter
etcdClient := ctx.GlobalVars().EtcdClient
captureID := ctx.GlobalVars().CaptureInfo.ID
cfg := config.GetGlobalServerConfig().Debug
if cfg.EnableSchedulerV3 {
ret, err = scheduler.NewAgentV3(
ctx, captureID, messageServer, messageRouter, etcdClient, p, p.changefeedID)
ctx, captureID, liveness,
messageServer, messageRouter, etcdClient, p, p.changefeedID)
} else {
ret, err = scheduler.NewAgent(
ctx, captureID, messageServer, messageRouter, etcdClient, p, p.changefeedID)
ctx, captureID, liveness,
messageServer, messageRouter, etcdClient, p, p.changefeedID)
}
return ret, errors.Trace(err)
}
Expand Down
24 changes: 16 additions & 8 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,11 @@ type mockAgent struct {

executor scheduler.TableExecutor
lastCheckpointTs model.Ts
lastLiveness model.Liveness
liveness *model.Liveness
isClosed bool
}

func (a *mockAgent) Tick(_ context.Context, liveness model.Liveness) error {
a.lastLiveness = liveness
func (a *mockAgent) Tick(_ context.Context) error {
if len(a.executor.GetAllCurrentTables()) == 0 {
return nil
}
Expand Down Expand Up @@ -823,6 +822,12 @@ func TestProcessorLiveness(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
liveness := model.LivenessCaptureAlive
p, tester := initProcessor4Test(ctx, t, &liveness)
p.lazyInit = func(ctx cdcContext.Context) error {
// Mock the newAgent procedure in p.lazyInitImpl,
// by passing p.liveness to mockAgent.
p.agent = &mockAgent{executor: p, liveness: p.liveness}
return nil
}

// First tick for creating position.
_, err := p.Tick(ctx, p.changefeed)
Expand All @@ -832,10 +837,13 @@ func TestProcessorLiveness(t *testing.T) {
// Second tick for init.
_, err = p.Tick(ctx, p.changefeed)
require.Nil(t, err)
require.Equal(t, model.LivenessCaptureAlive, p.agent.(*mockAgent).lastLiveness)

liveness.Store(model.LivenessCaptureStopping)
_, err = p.Tick(ctx, p.changefeed)
require.Nil(t, err)
require.Equal(t, model.LivenessCaptureStopping, p.agent.(*mockAgent).lastLiveness)
// Changing p.liveness affects p.agent liveness.
p.liveness.Store(model.LivenessCaptureStopping)
require.Equal(t, model.LivenessCaptureStopping, p.agent.(*mockAgent).liveness.Load())

// Changing p.agent liveness affects p.liveness.
// Force set liveness to alive.
*p.agent.(*mockAgent).liveness = model.LivenessCaptureAlive
require.Equal(t, model.LivenessCaptureAlive, p.liveness.Load())
}
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// Note that Agent is not thread-safe
type Agent interface {
// Tick is called periodically by the processor to drive the Agent's internal logic.
Tick(context.Context, model.Liveness) error
Tick(context.Context) error

// GetLastSentCheckpointTs returns the last checkpoint-ts already sent to the Owner.
GetLastSentCheckpointTs() (checkpointTs model.Ts)
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/v2/processor_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func NewAgent(
return ret, nil
}

func (a *agentImpl) Tick(ctx context.Context, liveness model.Liveness) error {
func (a *agentImpl) Tick(ctx context.Context) error {
for _, errCh := range a.handlerErrChs {
select {
case <-ctx.Done():
Expand Down
24 changes: 12 additions & 12 deletions cdc/scheduler/internal/v2/processor_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func TestAgentBasics(t *testing.T) {
require.NoError(t, err)

// Test Point 2: First tick should sync the SyncMessage.
err = agent.Tick(suite.ctx, model.LivenessCaptureAlive)
err = agent.Tick(suite.ctx)
require.NoError(t, err)

select {
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestAgentBasics(t *testing.T) {
Return(model.Ts(1000), model.Ts(1000))

require.Eventually(t, func() bool {
err = agent.Tick(suite.ctx, model.LivenessCaptureAlive)
err = agent.Tick(suite.ctx)
require.NoError(t, err)
if len(suite.tableExecutor.Running) != 1 {
return false
Expand All @@ -329,7 +329,7 @@ func TestAgentBasics(t *testing.T) {

suite.tableExecutor.On("GetCheckpoint").Return(model.Ts(1000), model.Ts(1000))
// Test Point 4: Accept an incoming DispatchTableMessage, and the AddTable method in TableExecutor can return true.
err = agent.Tick(suite.ctx, model.LivenessCaptureAlive)
err = agent.Tick(suite.ctx)
require.NoError(t, err)

require.Eventually(t, func() bool {
Expand All @@ -345,7 +345,7 @@ func TestAgentBasics(t *testing.T) {
default:
}

err = agent.Tick(suite.ctx, model.LivenessCaptureAlive)
err = agent.Tick(suite.ctx)
require.NoError(t, err)
return false
}, time.Second*3, time.Millisecond*10)
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestAgentNoOwnerAtStartUp(t *testing.T) {

// Test Point 2: First ticks should not panic
for i := 0; i < 10; i++ {
err = agent.Tick(suite.ctx, model.LivenessCaptureAlive)
err = agent.Tick(suite.ctx)
require.NoError(t, err)
}

Expand All @@ -392,7 +392,7 @@ func TestAgentNoOwnerAtStartUp(t *testing.T) {
require.NoError(t, err)

require.Eventually(t, func() bool {
err := agent.Tick(suite.ctx, model.LivenessCaptureAlive)
err := agent.Tick(suite.ctx)
require.NoError(t, err)
select {
case <-suite.ctx.Done():
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestAgentTolerateClientClosed(t *testing.T) {

// Test Point 2: We should tolerate the error ErrPeerMessageClientClosed
for i := 0; i < 6; i++ {
err = agent.Tick(suite.ctx, model.LivenessCaptureAlive)
err = agent.Tick(suite.ctx)
require.NoError(t, err)
}

Expand Down Expand Up @@ -495,7 +495,7 @@ func TestNoFinishOperationBeforeSyncIsReceived(t *testing.T) {

suite.BlockSync()

err = agent.Tick(suite.ctx, model.LivenessCaptureAlive)
err = agent.Tick(suite.ctx)
require.NoError(t, err)

_, err = suite.ownerMessageClient.SendMessage(suite.ctx,
Expand Down Expand Up @@ -541,7 +541,7 @@ func TestNoFinishOperationBeforeSyncIsReceived(t *testing.T) {

start := time.Now()
for time.Since(start) < 100*time.Millisecond {
err := agent.Tick(suite.ctx, model.LivenessCaptureAlive)
err := agent.Tick(suite.ctx)
require.NoError(t, err)

select {
Expand Down Expand Up @@ -574,7 +574,7 @@ func TestNoFinishOperationBeforeSyncIsReceived(t *testing.T) {
default:
}

err := agent.Tick(suite.ctx, model.LivenessCaptureAlive)
err := agent.Tick(suite.ctx)
require.NoError(t, err)
return false
}, 1*time.Second, 10*time.Millisecond)
Expand All @@ -588,7 +588,7 @@ func TestNoFinishOperationBeforeSyncIsReceived(t *testing.T) {
default:
}

err := agent.Tick(suite.ctx, model.LivenessCaptureAlive)
err := agent.Tick(suite.ctx)
require.NoError(t, err)
return false
}, time.Second*3, time.Millisecond*10)
Expand All @@ -602,7 +602,7 @@ func TestNoFinishOperationBeforeSyncIsReceived(t *testing.T) {
default:
}

err := agent.Tick(suite.ctx, model.LivenessCaptureAlive)
err := agent.Tick(suite.ctx)
require.NoError(t, err)
return false
}, time.Second*3, time.Millisecond*10)
Expand Down
Loading