From aa020c03064a443c98cb2c6b2c6b25095939a0c7 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 1 Jun 2022 17:04:23 +0800 Subject: [PATCH 1/3] add more tests. --- cdc/scheduler/internal/tp/agent.go | 20 +- cdc/scheduler/internal/tp/agent_bench_test.go | 2 +- cdc/scheduler/internal/tp/agent_test.go | 236 +++++++++++++++++- cdc/scheduler/internal/tp/coordinator_test.go | 54 ++-- 4 files changed, 270 insertions(+), 42 deletions(-) diff --git a/cdc/scheduler/internal/tp/agent.go b/cdc/scheduler/internal/tp/agent.go index 31bc29b49da..5a6afd8e017 100644 --- a/cdc/scheduler/internal/tp/agent.go +++ b/cdc/scheduler/internal/tp/agent.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/tiflow/cdc/processor/pipeline" "github.com/pingcap/tiflow/cdc/scheduler/internal" "github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/p2p" @@ -83,15 +82,6 @@ func NewAgent(ctx context.Context, } result.trans = trans - conf := config.GetGlobalServerConfig() - flushInterval := time.Duration(conf.ProcessorFlushInterval) - - log.Debug("tpscheduler: creating processor agent", - zap.String("capture", captureID), - zap.String("namespace", changeFeedID.Namespace), - zap.String("changefeed", changeFeedID.ID), - zap.Duration("sendCheckpointTsInterval", flushInterval)) - etcdCliCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -185,7 +175,6 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) []*schedulepb.Message { response := a.handleMessageHeartbeat(message.Heartbeat.GetTableIDs()) result = append(result, response) case schedulepb.MsgUnknown: - default: log.Warn("tpscheduler: unknown message received", zap.String("capture", a.captureID), zap.String("namespace", a.changeFeedID.Namespace), @@ -579,11 +568,12 @@ func (a *agent) recvMsgs(ctx context.Context) ([]*schedulepb.Message, error) { n := 0 for _, val := range messages { - // Filter stale messages. - if val.Header.OwnerRevision == a.ownerInfo.revision { - messages[n] = val - n++ + // only receive not staled messages + if !a.handleOwnerInfo(val.From, val.Header.OwnerRevision.Revision, val.Header.Version) { + continue } + messages[n] = val + n++ } return messages[:n], nil } diff --git a/cdc/scheduler/internal/tp/agent_bench_test.go b/cdc/scheduler/internal/tp/agent_bench_test.go index 4d1b4b508ff..8f4bcedd6e4 100644 --- a/cdc/scheduler/internal/tp/agent_bench_test.go +++ b/cdc/scheduler/internal/tp/agent_bench_test.go @@ -24,7 +24,7 @@ import ( func benchmarkCollectTableStatus(b *testing.B, bench func(b *testing.B, a *agent)) { upperBound := 16384 for size := 1; size <= upperBound; size *= 2 { - tableExec := NewMockTableExecutor() + tableExec := newMockTableExecutor() a := &agent{ tableExec: tableExec, } diff --git a/cdc/scheduler/internal/tp/agent_test.go b/cdc/scheduler/internal/tp/agent_test.go index 837f3bfb054..880eb49f5f0 100644 --- a/cdc/scheduler/internal/tp/agent_test.go +++ b/cdc/scheduler/internal/tp/agent_test.go @@ -15,12 +15,12 @@ package tp import ( "context" + "sort" "testing" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/pipeline" - "github.com/pingcap/tiflow/cdc/scheduler/internal/base" "github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -36,16 +36,56 @@ func newBaseAgent4Test() *agent { }, version: "agent-version-1", epoch: schedulepb.ProcessorEpoch{Epoch: "agent-epoch-1"}, + captureID: "agent-1", runningTasks: make(map[model.TableID]*dispatchTableTask), } } +func TestAgentCollectTableStatus(t *testing.T) { + t.Parallel() + + a := newBaseAgent4Test() + + mockTableExecutor := newMockTableExecutor() + a.tableExec = mockTableExecutor + + mockTableExecutor.tables[model.TableID(0)] = pipeline.TableStatePreparing + mockTableExecutor.tables[model.TableID(1)] = pipeline.TableStatePrepared + mockTableExecutor.tables[model.TableID(2)] = pipeline.TableStateReplicating + mockTableExecutor.tables[model.TableID(3)] = pipeline.TableStateStopping + mockTableExecutor.tables[model.TableID(4)] = pipeline.TableStateStopped + + expected := make([]model.TableID, 0, 10) + for i := 0; i < 10; i++ { + expected = append(expected, model.TableID(i)) + } + + result := a.collectTableStatus(expected) + require.Len(t, result, 10) + sort.Slice(result, func(i, j int) bool { + return result[i].TableID < result[j].TableID + }) + require.Equal(t, schedulepb.TableStatePreparing, result[0].State) + require.Equal(t, schedulepb.TableStatePrepared, result[1].State) + require.Equal(t, schedulepb.TableStateReplicating, result[2].State) + require.Equal(t, schedulepb.TableStateStopping, result[3].State) + require.Equal(t, schedulepb.TableStateStopped, result[4].State) + for i := 5; i < 10; i++ { + require.Equal(t, schedulepb.TableStateAbsent, result[i].State) + } + + a.runningTasks[model.TableID(0)] = &dispatchTableTask{IsRemove: true} + status := a.newTableStatus(model.TableID(0)) + require.Equal(t, schedulepb.TableStateStopping, status.State) + +} + func TestAgentHandleDispatchTableTask(t *testing.T) { t.Parallel() a := newBaseAgent4Test() - mockTableExecutor := NewMockTableExecutor() + mockTableExecutor := newMockTableExecutor() a.tableExec = mockTableExecutor tableID := model.TableID(1) @@ -146,11 +186,76 @@ func TestAgentHandleDispatchTableTask(t *testing.T) { } } +func TestAgentHandleMessageStopping(t *testing.T) { + t.Parallel() + + a := newBaseAgent4Test() + a.tableExec = newMockTableExecutor() + a.stopping = true + + heartbeat := &schedulepb.Message{ + Header: &schedulepb.Message_Header{ + Version: "version-1", + OwnerRevision: schedulepb.OwnerRevision{Revision: 1}, + }, + MsgType: schedulepb.MsgHeartbeat, + From: "owner-1", + Heartbeat: &schedulepb.Heartbeat{}, + } + response := a.handleMessage([]*schedulepb.Message{heartbeat}) + require.Len(t, response, 1) + require.NotNil(t, response[0].HeartbeatResponse) + // agent is stopping, let coordinator know this. + require.True(t, response[0].HeartbeatResponse.IsStopping) + + addTableRequest := &schedulepb.Message{ + Header: &schedulepb.Message_Header{ + Version: "version-1", + OwnerRevision: schedulepb.OwnerRevision{Revision: 1}, + ProcessorEpoch: schedulepb.ProcessorEpoch{Epoch: "agent-epoch-1"}, + }, + MsgType: schedulepb.MsgDispatchTableRequest, + From: "owner-1", + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: 1, + IsSecondary: true, + Checkpoint: &schedulepb.Checkpoint{}, + }, + }, + }, + } + // add table request should not be handled, so the running task count is 0. + response = a.handleMessage([]*schedulepb.Message{addTableRequest}) + require.Len(t, a.runningTasks, 0) + + // mock agent have running task before stopping but processed yet. + a.runningTasks[model.TableID(1)] = &dispatchTableTask{ + TableID: model.TableID(1), + StartTs: 0, + IsRemove: false, + IsPrepare: false, + Epoch: schedulepb.ProcessorEpoch{}, + status: dispatchTableTaskReceived, + } + + result, err := a.handleDispatchTableTasks(context.Background()) + require.NoError(t, err) + require.Len(t, a.runningTasks, 0) + require.Len(t, result, 1) + + addTableResponse, ok := result[0].DispatchTableResponse. + Response.(*schedulepb.DispatchTableResponse_AddTable) + require.True(t, ok) + require.True(t, addTableResponse.AddTable.Reject) +} + func TestAgentHandleMessage(t *testing.T) { t.Parallel() a := newBaseAgent4Test() - a.tableExec = base.NewMockTableExecutor(t) + a.tableExec = newMockTableExecutor() heartbeat := &schedulepb.Message{ Header: &schedulepb.Message_Header{ @@ -196,6 +301,25 @@ func TestAgentHandleMessage(t *testing.T) { response = a.handleMessage([]*schedulepb.Message{heartbeat}) require.Equal(t, len(a.runningTasks), 1) require.Len(t, response, 1) + + // this should never happen in real world + unknownMessage := &schedulepb.Message{ + Header: &schedulepb.Message_Header{ + Version: "version-1", + OwnerRevision: schedulepb.OwnerRevision{Revision: 2}, + ProcessorEpoch: schedulepb.ProcessorEpoch{Epoch: "agent-epoch-1"}, + }, + MsgType: schedulepb.MsgUnknown, + From: "owner-1", + } + + response = a.handleMessage([]*schedulepb.Message{unknownMessage}) + require.Len(t, response, 0) + + // staled message + heartbeat.Header.OwnerRevision.Revision = 1 + response = a.handleMessage([]*schedulepb.Message{heartbeat}) + require.Len(t, response, 0) } func TestAgentUpdateOwnerInfo(t *testing.T) { @@ -214,6 +338,107 @@ func TestAgentUpdateOwnerInfo(t *testing.T) { require.True(t, ok) } +func TestAgentTick(t *testing.T) { + t.Parallel() + + a := newBaseAgent4Test() + trans := newMockTrans() + mockTableExecutor := newMockTableExecutor() + a.trans = trans + a.tableExec = mockTableExecutor + + heartbeat := &schedulepb.Message{ + Header: &schedulepb.Message_Header{ + Version: a.ownerInfo.version, + OwnerRevision: a.ownerInfo.revision, + // first heartbeat from the owner, no processor epoch + ProcessorEpoch: schedulepb.ProcessorEpoch{}, + }, + MsgType: schedulepb.MsgHeartbeat, + From: a.ownerInfo.captureID, + Heartbeat: &schedulepb.Heartbeat{TableIDs: nil}, + } + + // receive first heartbeat from the owner + messages := []*schedulepb.Message{heartbeat} + trans.recvBuffer = append(trans.recvBuffer, messages...) + + require.NoError(t, a.Tick(context.Background())) + require.Len(t, trans.sendBuffer, 1) + heartbeatResponse := trans.sendBuffer[0] + trans.sendBuffer = trans.sendBuffer[:0] + + require.Equal(t, schedulepb.MsgHeartbeatResponse, heartbeatResponse.MsgType) + require.Equal(t, a.ownerInfo.captureID, heartbeatResponse.To) + require.Equal(t, a.captureID, heartbeatResponse.From) + + addTableRequest := &schedulepb.Message{ + Header: &schedulepb.Message_Header{ + Version: a.ownerInfo.version, + OwnerRevision: a.ownerInfo.revision, + ProcessorEpoch: a.epoch, + }, + MsgType: schedulepb.MsgDispatchTableRequest, + From: a.ownerInfo.captureID, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: 1, + IsSecondary: true, + Checkpoint: &schedulepb.Checkpoint{}, + }, + }, + }, + } + + removeTableRequest := &schedulepb.Message{ + Header: &schedulepb.Message_Header{ + Version: a.ownerInfo.version, + OwnerRevision: a.ownerInfo.revision, + ProcessorEpoch: a.epoch, + }, + MsgType: schedulepb.MsgDispatchTableRequest, + From: a.ownerInfo.captureID, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_RemoveTable{ + RemoveTable: &schedulepb.RemoveTableRequest{ + TableID: 2, + }, + }, + }, + } + messages = append(messages, addTableRequest) + messages = append(messages, removeTableRequest) + trans.recvBuffer = append(trans.recvBuffer, messages...) + + mockTableExecutor.On("AddTable", mock.Anything, + mock.Anything, mock.Anything, mock.Anything).Return(true, nil) + mockTableExecutor.On("IsAddTableFinished", mock.Anything, + mock.Anything, mock.Anything).Return(false, nil) + require.NoError(t, a.Tick(context.Background())) + responses := trans.sendBuffer[:len(trans.sendBuffer)] + trans.sendBuffer = trans.sendBuffer[:0] + require.Equal(t, schedulepb.MsgHeartbeatResponse, responses[0].MsgType) + + messages = messages[:0] + // this one should be ignored, since the previous one with the same tableID is not finished yet. + messages = append(messages, addTableRequest) + trans.recvBuffer = append(trans.recvBuffer, messages...) + + mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1] + mockTableExecutor.On("IsAddTableFinished", mock.Anything, + mock.Anything, mock.Anything).Return(true, nil) + require.NoError(t, a.Tick(context.Background())) + responses = trans.sendBuffer[:len(trans.sendBuffer)] + trans.sendBuffer = trans.sendBuffer[:0] + require.Len(t, responses, 1) + require.Equal(t, schedulepb.MsgDispatchTableResponse, responses[0].MsgType) + resp, ok := responses[0].DispatchTableResponse. + Response.(*schedulepb.DispatchTableResponse_AddTable) + require.True(t, ok) + require.Equal(t, schedulepb.TableStatePrepared, resp.AddTable.Status.State) +} + // MockTableExecutor is a mock implementation of TableExecutor. type MockTableExecutor struct { mock.Mock @@ -223,8 +448,8 @@ type MockTableExecutor struct { tables map[model.TableID]pipeline.TableState } -// NewMockTableExecutor creates a new mock table executor. -func NewMockTableExecutor() *MockTableExecutor { +// newMockTableExecutor creates a new mock table executor. +func newMockTableExecutor() *MockTableExecutor { return &MockTableExecutor{ tables: map[model.TableID]pipeline.TableState{}, } @@ -311,6 +536,7 @@ func (e *MockTableExecutor) IsAddTableFinished(ctx context.Context, tableID mode func (e *MockTableExecutor) IsRemoveTableFinished(ctx context.Context, tableID model.TableID) (model.Ts, bool) { state, ok := e.tables[tableID] if !ok { + // the real `table executor` processor, would panic in such case. log.Warn("table to be removed is not found", zap.Int64("tableID", tableID)) return 0, true diff --git a/cdc/scheduler/internal/tp/coordinator_test.go b/cdc/scheduler/internal/tp/coordinator_test.go index 0a5b527cdcf..0db032be2c7 100644 --- a/cdc/scheduler/internal/tp/coordinator_test.go +++ b/cdc/scheduler/internal/tp/coordinator_test.go @@ -22,8 +22,15 @@ import ( ) type mockTrans struct { - send func(ctx context.Context, msgs []*schedulepb.Message) error - recv func(ctx context.Context) ([]*schedulepb.Message, error) + sendBuffer []*schedulepb.Message + recvBuffer []*schedulepb.Message +} + +func newMockTrans() *mockTrans { + return &mockTrans{ + sendBuffer: make([]*schedulepb.Message, 0), + recvBuffer: make([]*schedulepb.Message, 0), + } } func (m *mockTrans) Close() error { @@ -31,34 +38,35 @@ func (m *mockTrans) Close() error { } func (m *mockTrans) Send(ctx context.Context, msgs []*schedulepb.Message) error { - return m.send(ctx, msgs) + m.sendBuffer = append(m.sendBuffer, msgs...) + return nil } func (m *mockTrans) Recv(ctx context.Context) ([]*schedulepb.Message, error) { - return m.recv(ctx) + messages := m.recvBuffer[:len(m.recvBuffer)] + m.recvBuffer = make([]*schedulepb.Message, 0) + return messages, nil } func TestCoordinatorSendMsgs(t *testing.T) { t.Parallel() ctx := context.Background() - trans := &mockTrans{} + trans := newMockTrans() cood := coordinator{ version: "6.2.0", revision: schedulepb.OwnerRevision{Revision: 3}, trans: trans, } - trans.send = func(ctx context.Context, msgs []*schedulepb.Message) error { - require.EqualValues(t, []*schedulepb.Message{{ - Header: &schedulepb.Message_Header{ - Version: cood.version, - OwnerRevision: cood.revision, - }, - To: "1", MsgType: schedulepb.MsgDispatchTableRequest, - }}, msgs) - return nil - } cood.sendMsgs( ctx, []*schedulepb.Message{{To: "1", MsgType: schedulepb.MsgDispatchTableRequest}}) + + require.EqualValues(t, []*schedulepb.Message{{ + Header: &schedulepb.Message_Header{ + Version: cood.version, + OwnerRevision: cood.revision, + }, + To: "1", MsgType: schedulepb.MsgDispatchTableRequest, + }}, trans.sendBuffer) } func TestCoordinatorRecvMsgs(t *testing.T) { @@ -71,21 +79,25 @@ func TestCoordinatorRecvMsgs(t *testing.T) { revision: schedulepb.OwnerRevision{Revision: 3}, trans: trans, } - trans.recv = func(ctx context.Context) ([]*schedulepb.Message, error) { - return []*schedulepb.Message{{ + + trans.recvBuffer = append(trans.recvBuffer, + &schedulepb.Message{ Header: &schedulepb.Message_Header{ OwnerRevision: cood.revision, }, From: "1", MsgType: schedulepb.MsgDispatchTableResponse, - }, { + }) + trans.recvBuffer = append(trans.recvBuffer, + &schedulepb.Message{ Header: &schedulepb.Message_Header{ OwnerRevision: schedulepb.OwnerRevision{Revision: 4}, }, From: "2", MsgType: schedulepb.MsgDispatchTableResponse, - }}, nil - } + }) + msgs, err := cood.recvMsgs(ctx) - require.Nil(t, err) + require.NoError(t, err) + require.EqualValues(t, []*schedulepb.Message{{ Header: &schedulepb.Message_Header{ OwnerRevision: cood.revision, From e127e08fa9685b1d8fc91c019f2a212b205ca2b6 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Wed, 1 Jun 2022 17:39:40 +0800 Subject: [PATCH 2/3] coverage to 79.1 --- cdc/scheduler/internal/tp/agent_test.go | 33 ++++++++++++++++--------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/cdc/scheduler/internal/tp/agent_test.go b/cdc/scheduler/internal/tp/agent_test.go index 880eb49f5f0..af6bd150ce3 100644 --- a/cdc/scheduler/internal/tp/agent_test.go +++ b/cdc/scheduler/internal/tp/agent_test.go @@ -259,11 +259,11 @@ func TestAgentHandleMessage(t *testing.T) { heartbeat := &schedulepb.Message{ Header: &schedulepb.Message_Header{ - Version: "version-1", - OwnerRevision: schedulepb.OwnerRevision{Revision: 1}, + Version: a.ownerInfo.version, + OwnerRevision: a.ownerInfo.revision, }, MsgType: schedulepb.MsgHeartbeat, - From: "owner-1", + From: a.ownerInfo.captureID, Heartbeat: &schedulepb.Heartbeat{}, } // handle the first heartbeat, from the known owner. @@ -276,12 +276,13 @@ func TestAgentHandleMessage(t *testing.T) { addTableRequest := &schedulepb.Message{ Header: &schedulepb.Message_Header{ - Version: "version-1", - OwnerRevision: schedulepb.OwnerRevision{Revision: 1}, - ProcessorEpoch: schedulepb.ProcessorEpoch{Epoch: "agent-epoch-1"}, + Version: a.ownerInfo.version, + OwnerRevision: a.ownerInfo.revision, + // wrong epoch + ProcessorEpoch: schedulepb.ProcessorEpoch{Epoch: "wrong-agent-epoch-1"}, }, MsgType: schedulepb.MsgDispatchTableRequest, - From: "owner-1", + From: a.ownerInfo.captureID, DispatchTableRequest: &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_AddTable{ AddTable: &schedulepb.AddTableRequest{ @@ -292,9 +293,15 @@ func TestAgentHandleMessage(t *testing.T) { }, }, } - // add table request in pending + // wrong epoch, ignored response = a.handleMessage([]*schedulepb.Message{addTableRequest}) - require.Equal(t, len(a.runningTasks), 1) + require.Len(t, a.runningTasks, 0) + require.Len(t, response, 0) + + // correct epoch, processing. + addTableRequest.Header.ProcessorEpoch = a.epoch + response = a.handleMessage([]*schedulepb.Message{addTableRequest}) + require.Len(t, a.runningTasks, 1) require.Len(t, response, 0) heartbeat.Header.OwnerRevision.Revision = 2 @@ -305,12 +312,12 @@ func TestAgentHandleMessage(t *testing.T) { // this should never happen in real world unknownMessage := &schedulepb.Message{ Header: &schedulepb.Message_Header{ - Version: "version-1", + Version: a.ownerInfo.version, OwnerRevision: schedulepb.OwnerRevision{Revision: 2}, - ProcessorEpoch: schedulepb.ProcessorEpoch{Epoch: "agent-epoch-1"}, + ProcessorEpoch: a.epoch, }, MsgType: schedulepb.MsgUnknown, - From: "owner-1", + From: a.ownerInfo.captureID, } response = a.handleMessage([]*schedulepb.Message{unknownMessage}) @@ -437,6 +444,8 @@ func TestAgentTick(t *testing.T) { Response.(*schedulepb.DispatchTableResponse_AddTable) require.True(t, ok) require.Equal(t, schedulepb.TableStatePrepared, resp.AddTable.Status.State) + + require.NoError(t, a.Close()) } // MockTableExecutor is a mock implementation of TableExecutor. From f833afd51dcfffa8d4af046a95d5d21f6dfde681 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Wed, 1 Jun 2022 17:57:58 +0800 Subject: [PATCH 3/3] Update cdc/scheduler/internal/tp/agent.go Co-authored-by: Neil Shen --- cdc/scheduler/internal/tp/agent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/scheduler/internal/tp/agent.go b/cdc/scheduler/internal/tp/agent.go index 5a6afd8e017..86736edc871 100644 --- a/cdc/scheduler/internal/tp/agent.go +++ b/cdc/scheduler/internal/tp/agent.go @@ -174,7 +174,7 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) []*schedulepb.Message { case schedulepb.MsgHeartbeat: response := a.handleMessageHeartbeat(message.Heartbeat.GetTableIDs()) result = append(result, response) - case schedulepb.MsgUnknown: + default: log.Warn("tpscheduler: unknown message received", zap.String("capture", a.captureID), zap.String("namespace", a.changeFeedID.Namespace),