Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fb/latency(cdc): add more unit test to agent #5703

Merged
merged 3 commits into from
Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions cdc/scheduler/internal/tp/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/scheduler/internal"
"github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/p2p"
Expand Down Expand Up @@ -83,15 +82,6 @@ func NewAgent(ctx context.Context,
}
result.trans = trans

conf := config.GetGlobalServerConfig()
flushInterval := time.Duration(conf.ProcessorFlushInterval)

log.Debug("tpscheduler: creating processor agent",
zap.String("capture", captureID),
zap.String("namespace", changeFeedID.Namespace),
zap.String("changefeed", changeFeedID.ID),
zap.Duration("sendCheckpointTsInterval", flushInterval))

etcdCliCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

Expand Down Expand Up @@ -184,7 +174,6 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) []*schedulepb.Message {
case schedulepb.MsgHeartbeat:
response := a.handleMessageHeartbeat(message.Heartbeat.GetTableIDs())
result = append(result, response)
case schedulepb.MsgUnknown:
default:
log.Warn("tpscheduler: unknown message received",
zap.String("capture", a.captureID),
Expand Down Expand Up @@ -579,11 +568,12 @@ func (a *agent) recvMsgs(ctx context.Context) ([]*schedulepb.Message, error) {

n := 0
for _, val := range messages {
// Filter stale messages.
if val.Header.OwnerRevision == a.ownerInfo.revision {
messages[n] = val
n++
// only receive not staled messages
if !a.handleOwnerInfo(val.From, val.Header.OwnerRevision.Revision, val.Header.Version) {
continue
}
messages[n] = val
n++
}
return messages[:n], nil
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/tp/agent_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
func benchmarkCollectTableStatus(b *testing.B, bench func(b *testing.B, a *agent)) {
upperBound := 16384
for size := 1; size <= upperBound; size *= 2 {
tableExec := NewMockTableExecutor()
tableExec := newMockTableExecutor()
a := &agent{
tableExec: tableExec,
}
Expand Down
259 changes: 247 additions & 12 deletions cdc/scheduler/internal/tp/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ package tp

import (
"context"
"sort"
"testing"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/scheduler/internal/base"
"github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -36,16 +36,56 @@ func newBaseAgent4Test() *agent {
},
version: "agent-version-1",
epoch: schedulepb.ProcessorEpoch{Epoch: "agent-epoch-1"},
captureID: "agent-1",
runningTasks: make(map[model.TableID]*dispatchTableTask),
}
}

func TestAgentCollectTableStatus(t *testing.T) {
t.Parallel()

a := newBaseAgent4Test()

mockTableExecutor := newMockTableExecutor()
a.tableExec = mockTableExecutor

mockTableExecutor.tables[model.TableID(0)] = pipeline.TableStatePreparing
mockTableExecutor.tables[model.TableID(1)] = pipeline.TableStatePrepared
mockTableExecutor.tables[model.TableID(2)] = pipeline.TableStateReplicating
mockTableExecutor.tables[model.TableID(3)] = pipeline.TableStateStopping
mockTableExecutor.tables[model.TableID(4)] = pipeline.TableStateStopped

expected := make([]model.TableID, 0, 10)
for i := 0; i < 10; i++ {
expected = append(expected, model.TableID(i))
}

result := a.collectTableStatus(expected)
require.Len(t, result, 10)
sort.Slice(result, func(i, j int) bool {
return result[i].TableID < result[j].TableID
})
require.Equal(t, schedulepb.TableStatePreparing, result[0].State)
require.Equal(t, schedulepb.TableStatePrepared, result[1].State)
require.Equal(t, schedulepb.TableStateReplicating, result[2].State)
require.Equal(t, schedulepb.TableStateStopping, result[3].State)
require.Equal(t, schedulepb.TableStateStopped, result[4].State)
for i := 5; i < 10; i++ {
require.Equal(t, schedulepb.TableStateAbsent, result[i].State)
}

a.runningTasks[model.TableID(0)] = &dispatchTableTask{IsRemove: true}
status := a.newTableStatus(model.TableID(0))
require.Equal(t, schedulepb.TableStateStopping, status.State)

}

func TestAgentHandleDispatchTableTask(t *testing.T) {
t.Parallel()

a := newBaseAgent4Test()

mockTableExecutor := NewMockTableExecutor()
mockTableExecutor := newMockTableExecutor()
a.tableExec = mockTableExecutor

tableID := model.TableID(1)
Expand Down Expand Up @@ -146,11 +186,12 @@ func TestAgentHandleDispatchTableTask(t *testing.T) {
}
}

func TestAgentHandleMessage(t *testing.T) {
func TestAgentHandleMessageStopping(t *testing.T) {
t.Parallel()

a := newBaseAgent4Test()
a.tableExec = base.NewMockTableExecutor(t)
a.tableExec = newMockTableExecutor()
a.stopping = true

heartbeat := &schedulepb.Message{
Header: &schedulepb.Message_Header{
Expand All @@ -161,13 +202,11 @@ func TestAgentHandleMessage(t *testing.T) {
From: "owner-1",
Heartbeat: &schedulepb.Heartbeat{},
}
// handle the first heartbeat, from the known owner.
response := a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)
require.NotNil(t, response[0].HeartbeatResponse)
require.Equal(t, response[0].Header.Version, a.version)
require.Equal(t, response[0].Header.OwnerRevision, a.ownerInfo.revision)
require.Equal(t, response[0].Header.ProcessorEpoch, a.epoch)
// agent is stopping, let coordinator know this.
require.True(t, response[0].HeartbeatResponse.IsStopping)

addTableRequest := &schedulepb.Message{
Header: &schedulepb.Message_Header{
Expand All @@ -187,15 +226,107 @@ func TestAgentHandleMessage(t *testing.T) {
},
},
}
// add table request in pending
// add table request should not be handled, so the running task count is 0.
response = a.handleMessage([]*schedulepb.Message{addTableRequest})
require.Equal(t, len(a.runningTasks), 1)
require.Len(t, a.runningTasks, 0)

// mock agent have running task before stopping but processed yet.
a.runningTasks[model.TableID(1)] = &dispatchTableTask{
TableID: model.TableID(1),
StartTs: 0,
IsRemove: false,
IsPrepare: false,
Epoch: schedulepb.ProcessorEpoch{},
status: dispatchTableTaskReceived,
}

result, err := a.handleDispatchTableTasks(context.Background())
require.NoError(t, err)
require.Len(t, a.runningTasks, 0)
require.Len(t, result, 1)

addTableResponse, ok := result[0].DispatchTableResponse.
Response.(*schedulepb.DispatchTableResponse_AddTable)
require.True(t, ok)
require.True(t, addTableResponse.AddTable.Reject)
}

func TestAgentHandleMessage(t *testing.T) {
t.Parallel()

a := newBaseAgent4Test()
a.tableExec = newMockTableExecutor()

heartbeat := &schedulepb.Message{
Header: &schedulepb.Message_Header{
Version: a.ownerInfo.version,
OwnerRevision: a.ownerInfo.revision,
},
MsgType: schedulepb.MsgHeartbeat,
From: a.ownerInfo.captureID,
Heartbeat: &schedulepb.Heartbeat{},
}
// handle the first heartbeat, from the known owner.
response := a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)
require.NotNil(t, response[0].HeartbeatResponse)
require.Equal(t, response[0].Header.Version, a.version)
require.Equal(t, response[0].Header.OwnerRevision, a.ownerInfo.revision)
require.Equal(t, response[0].Header.ProcessorEpoch, a.epoch)

addTableRequest := &schedulepb.Message{
Header: &schedulepb.Message_Header{
Version: a.ownerInfo.version,
OwnerRevision: a.ownerInfo.revision,
// wrong epoch
ProcessorEpoch: schedulepb.ProcessorEpoch{Epoch: "wrong-agent-epoch-1"},
},
MsgType: schedulepb.MsgDispatchTableRequest,
From: a.ownerInfo.captureID,
DispatchTableRequest: &schedulepb.DispatchTableRequest{
Request: &schedulepb.DispatchTableRequest_AddTable{
AddTable: &schedulepb.AddTableRequest{
TableID: 1,
IsSecondary: true,
Checkpoint: &schedulepb.Checkpoint{},
},
},
},
}
// wrong epoch, ignored
response = a.handleMessage([]*schedulepb.Message{addTableRequest})
require.Len(t, a.runningTasks, 0)
require.Len(t, response, 0)

// correct epoch, processing.
addTableRequest.Header.ProcessorEpoch = a.epoch
response = a.handleMessage([]*schedulepb.Message{addTableRequest})
require.Len(t, a.runningTasks, 1)
require.Len(t, response, 0)

heartbeat.Header.OwnerRevision.Revision = 2
response = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Equal(t, len(a.runningTasks), 1)
require.Len(t, response, 1)

// this should never happen in real world
unknownMessage := &schedulepb.Message{
Header: &schedulepb.Message_Header{
Version: a.ownerInfo.version,
OwnerRevision: schedulepb.OwnerRevision{Revision: 2},
ProcessorEpoch: a.epoch,
},
MsgType: schedulepb.MsgUnknown,
From: a.ownerInfo.captureID,
}

response = a.handleMessage([]*schedulepb.Message{unknownMessage})
require.Len(t, response, 0)

// staled message
heartbeat.Header.OwnerRevision.Revision = 1
response = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 0)
}

func TestAgentUpdateOwnerInfo(t *testing.T) {
Expand All @@ -214,6 +345,109 @@ func TestAgentUpdateOwnerInfo(t *testing.T) {
require.True(t, ok)
}

func TestAgentTick(t *testing.T) {
t.Parallel()

a := newBaseAgent4Test()
trans := newMockTrans()
mockTableExecutor := newMockTableExecutor()
a.trans = trans
a.tableExec = mockTableExecutor

heartbeat := &schedulepb.Message{
Header: &schedulepb.Message_Header{
Version: a.ownerInfo.version,
OwnerRevision: a.ownerInfo.revision,
// first heartbeat from the owner, no processor epoch
ProcessorEpoch: schedulepb.ProcessorEpoch{},
},
MsgType: schedulepb.MsgHeartbeat,
From: a.ownerInfo.captureID,
Heartbeat: &schedulepb.Heartbeat{TableIDs: nil},
}

// receive first heartbeat from the owner
messages := []*schedulepb.Message{heartbeat}
trans.recvBuffer = append(trans.recvBuffer, messages...)

require.NoError(t, a.Tick(context.Background()))
require.Len(t, trans.sendBuffer, 1)
heartbeatResponse := trans.sendBuffer[0]
trans.sendBuffer = trans.sendBuffer[:0]

require.Equal(t, schedulepb.MsgHeartbeatResponse, heartbeatResponse.MsgType)
require.Equal(t, a.ownerInfo.captureID, heartbeatResponse.To)
require.Equal(t, a.captureID, heartbeatResponse.From)

addTableRequest := &schedulepb.Message{
Header: &schedulepb.Message_Header{
Version: a.ownerInfo.version,
OwnerRevision: a.ownerInfo.revision,
ProcessorEpoch: a.epoch,
},
MsgType: schedulepb.MsgDispatchTableRequest,
From: a.ownerInfo.captureID,
DispatchTableRequest: &schedulepb.DispatchTableRequest{
Request: &schedulepb.DispatchTableRequest_AddTable{
AddTable: &schedulepb.AddTableRequest{
TableID: 1,
IsSecondary: true,
Checkpoint: &schedulepb.Checkpoint{},
},
},
},
}

removeTableRequest := &schedulepb.Message{
Header: &schedulepb.Message_Header{
Version: a.ownerInfo.version,
OwnerRevision: a.ownerInfo.revision,
ProcessorEpoch: a.epoch,
},
MsgType: schedulepb.MsgDispatchTableRequest,
From: a.ownerInfo.captureID,
DispatchTableRequest: &schedulepb.DispatchTableRequest{
Request: &schedulepb.DispatchTableRequest_RemoveTable{
RemoveTable: &schedulepb.RemoveTableRequest{
TableID: 2,
},
},
},
}
messages = append(messages, addTableRequest)
messages = append(messages, removeTableRequest)
trans.recvBuffer = append(trans.recvBuffer, messages...)

mockTableExecutor.On("AddTable", mock.Anything,
mock.Anything, mock.Anything, mock.Anything).Return(true, nil)
mockTableExecutor.On("IsAddTableFinished", mock.Anything,
mock.Anything, mock.Anything).Return(false, nil)
require.NoError(t, a.Tick(context.Background()))
responses := trans.sendBuffer[:len(trans.sendBuffer)]
trans.sendBuffer = trans.sendBuffer[:0]
require.Equal(t, schedulepb.MsgHeartbeatResponse, responses[0].MsgType)

messages = messages[:0]
// this one should be ignored, since the previous one with the same tableID is not finished yet.
messages = append(messages, addTableRequest)
trans.recvBuffer = append(trans.recvBuffer, messages...)

mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1]
mockTableExecutor.On("IsAddTableFinished", mock.Anything,
mock.Anything, mock.Anything).Return(true, nil)
require.NoError(t, a.Tick(context.Background()))
responses = trans.sendBuffer[:len(trans.sendBuffer)]
trans.sendBuffer = trans.sendBuffer[:0]
require.Len(t, responses, 1)
require.Equal(t, schedulepb.MsgDispatchTableResponse, responses[0].MsgType)
resp, ok := responses[0].DispatchTableResponse.
Response.(*schedulepb.DispatchTableResponse_AddTable)
require.True(t, ok)
require.Equal(t, schedulepb.TableStatePrepared, resp.AddTable.Status.State)

require.NoError(t, a.Close())
}

// MockTableExecutor is a mock implementation of TableExecutor.
type MockTableExecutor struct {
mock.Mock
Expand All @@ -223,8 +457,8 @@ type MockTableExecutor struct {
tables map[model.TableID]pipeline.TableState
}

// NewMockTableExecutor creates a new mock table executor.
func NewMockTableExecutor() *MockTableExecutor {
// newMockTableExecutor creates a new mock table executor.
func newMockTableExecutor() *MockTableExecutor {
return &MockTableExecutor{
tables: map[model.TableID]pipeline.TableState{},
}
Expand Down Expand Up @@ -311,6 +545,7 @@ func (e *MockTableExecutor) IsAddTableFinished(ctx context.Context, tableID mode
func (e *MockTableExecutor) IsRemoveTableFinished(ctx context.Context, tableID model.TableID) (model.Ts, bool) {
state, ok := e.tables[tableID]
if !ok {
// the real `table executor` processor, would panic in such case.
log.Warn("table to be removed is not found",
zap.Int64("tableID", tableID))
return 0, true
Expand Down
Loading