Skip to content

Commit

Permalink
fb/latency(cdc): add more unit test to agent (pingcap#5703)
Browse files Browse the repository at this point in the history
* add more tests.

* coverage to 79.1

* Update cdc/scheduler/internal/tp/agent.go

Co-authored-by: Neil Shen <overvenus@gmail.com>

Co-authored-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
3AceShowHand and overvenus committed Jun 24, 2022
1 parent bfe4e0a commit f5c3f58
Show file tree
Hide file tree
Showing 4 changed files with 286 additions and 49 deletions.
20 changes: 5 additions & 15 deletions cdc/scheduler/internal/tp/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/scheduler/internal"
"github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/p2p"
Expand Down Expand Up @@ -83,15 +82,6 @@ func NewAgent(ctx context.Context,
}
result.trans = trans

conf := config.GetGlobalServerConfig()
flushInterval := time.Duration(conf.ProcessorFlushInterval)

log.Debug("tpscheduler: creating processor agent",
zap.String("capture", captureID),
zap.String("namespace", changeFeedID.Namespace),
zap.String("changefeed", changeFeedID.ID),
zap.Duration("sendCheckpointTsInterval", flushInterval))

etcdCliCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

Expand Down Expand Up @@ -184,7 +174,6 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) []*schedulepb.Message {
case schedulepb.MsgHeartbeat:
response := a.handleMessageHeartbeat(message.Heartbeat.GetTableIDs())
result = append(result, response)
case schedulepb.MsgUnknown:
default:
log.Warn("tpscheduler: unknown message received",
zap.String("capture", a.captureID),
Expand Down Expand Up @@ -579,11 +568,12 @@ func (a *agent) recvMsgs(ctx context.Context) ([]*schedulepb.Message, error) {

n := 0
for _, val := range messages {
// Filter stale messages.
if val.Header.OwnerRevision == a.ownerInfo.revision {
messages[n] = val
n++
// only receive not staled messages
if !a.handleOwnerInfo(val.From, val.Header.OwnerRevision.Revision, val.Header.Version) {
continue
}
messages[n] = val
n++
}
return messages[:n], nil
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/tp/agent_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
func benchmarkCollectTableStatus(b *testing.B, bench func(b *testing.B, a *agent)) {
upperBound := 16384
for size := 1; size <= upperBound; size *= 2 {
tableExec := NewMockTableExecutor()
tableExec := newMockTableExecutor()
a := &agent{
tableExec: tableExec,
}
Expand Down
259 changes: 247 additions & 12 deletions cdc/scheduler/internal/tp/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ package tp

import (
"context"
"sort"
"testing"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/scheduler/internal/base"
"github.com/pingcap/tiflow/cdc/scheduler/internal/tp/schedulepb"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand All @@ -36,16 +36,56 @@ func newBaseAgent4Test() *agent {
},
version: "agent-version-1",
epoch: schedulepb.ProcessorEpoch{Epoch: "agent-epoch-1"},
captureID: "agent-1",
runningTasks: make(map[model.TableID]*dispatchTableTask),
}
}

func TestAgentCollectTableStatus(t *testing.T) {
t.Parallel()

a := newBaseAgent4Test()

mockTableExecutor := newMockTableExecutor()
a.tableExec = mockTableExecutor

mockTableExecutor.tables[model.TableID(0)] = pipeline.TableStatePreparing
mockTableExecutor.tables[model.TableID(1)] = pipeline.TableStatePrepared
mockTableExecutor.tables[model.TableID(2)] = pipeline.TableStateReplicating
mockTableExecutor.tables[model.TableID(3)] = pipeline.TableStateStopping
mockTableExecutor.tables[model.TableID(4)] = pipeline.TableStateStopped

expected := make([]model.TableID, 0, 10)
for i := 0; i < 10; i++ {
expected = append(expected, model.TableID(i))
}

result := a.collectTableStatus(expected)
require.Len(t, result, 10)
sort.Slice(result, func(i, j int) bool {
return result[i].TableID < result[j].TableID
})
require.Equal(t, schedulepb.TableStatePreparing, result[0].State)
require.Equal(t, schedulepb.TableStatePrepared, result[1].State)
require.Equal(t, schedulepb.TableStateReplicating, result[2].State)
require.Equal(t, schedulepb.TableStateStopping, result[3].State)
require.Equal(t, schedulepb.TableStateStopped, result[4].State)
for i := 5; i < 10; i++ {
require.Equal(t, schedulepb.TableStateAbsent, result[i].State)
}

a.runningTasks[model.TableID(0)] = &dispatchTableTask{IsRemove: true}
status := a.newTableStatus(model.TableID(0))
require.Equal(t, schedulepb.TableStateStopping, status.State)

}

func TestAgentHandleDispatchTableTask(t *testing.T) {
t.Parallel()

a := newBaseAgent4Test()

mockTableExecutor := NewMockTableExecutor()
mockTableExecutor := newMockTableExecutor()
a.tableExec = mockTableExecutor

tableID := model.TableID(1)
Expand Down Expand Up @@ -146,11 +186,12 @@ func TestAgentHandleDispatchTableTask(t *testing.T) {
}
}

func TestAgentHandleMessage(t *testing.T) {
func TestAgentHandleMessageStopping(t *testing.T) {
t.Parallel()

a := newBaseAgent4Test()
a.tableExec = base.NewMockTableExecutor(t)
a.tableExec = newMockTableExecutor()
a.stopping = true

heartbeat := &schedulepb.Message{
Header: &schedulepb.Message_Header{
Expand All @@ -161,13 +202,11 @@ func TestAgentHandleMessage(t *testing.T) {
From: "owner-1",
Heartbeat: &schedulepb.Heartbeat{},
}
// handle the first heartbeat, from the known owner.
response := a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)
require.NotNil(t, response[0].HeartbeatResponse)
require.Equal(t, response[0].Header.Version, a.version)
require.Equal(t, response[0].Header.OwnerRevision, a.ownerInfo.revision)
require.Equal(t, response[0].Header.ProcessorEpoch, a.epoch)
// agent is stopping, let coordinator know this.
require.True(t, response[0].HeartbeatResponse.IsStopping)

addTableRequest := &schedulepb.Message{
Header: &schedulepb.Message_Header{
Expand All @@ -187,15 +226,107 @@ func TestAgentHandleMessage(t *testing.T) {
},
},
}
// add table request in pending
// add table request should not be handled, so the running task count is 0.
response = a.handleMessage([]*schedulepb.Message{addTableRequest})
require.Equal(t, len(a.runningTasks), 1)
require.Len(t, a.runningTasks, 0)

// mock agent have running task before stopping but processed yet.
a.runningTasks[model.TableID(1)] = &dispatchTableTask{
TableID: model.TableID(1),
StartTs: 0,
IsRemove: false,
IsPrepare: false,
Epoch: schedulepb.ProcessorEpoch{},
status: dispatchTableTaskReceived,
}

result, err := a.handleDispatchTableTasks(context.Background())
require.NoError(t, err)
require.Len(t, a.runningTasks, 0)
require.Len(t, result, 1)

addTableResponse, ok := result[0].DispatchTableResponse.
Response.(*schedulepb.DispatchTableResponse_AddTable)
require.True(t, ok)
require.True(t, addTableResponse.AddTable.Reject)
}

func TestAgentHandleMessage(t *testing.T) {
t.Parallel()

a := newBaseAgent4Test()
a.tableExec = newMockTableExecutor()

heartbeat := &schedulepb.Message{
Header: &schedulepb.Message_Header{
Version: a.ownerInfo.version,
OwnerRevision: a.ownerInfo.revision,
},
MsgType: schedulepb.MsgHeartbeat,
From: a.ownerInfo.captureID,
Heartbeat: &schedulepb.Heartbeat{},
}
// handle the first heartbeat, from the known owner.
response := a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 1)
require.NotNil(t, response[0].HeartbeatResponse)
require.Equal(t, response[0].Header.Version, a.version)
require.Equal(t, response[0].Header.OwnerRevision, a.ownerInfo.revision)
require.Equal(t, response[0].Header.ProcessorEpoch, a.epoch)

addTableRequest := &schedulepb.Message{
Header: &schedulepb.Message_Header{
Version: a.ownerInfo.version,
OwnerRevision: a.ownerInfo.revision,
// wrong epoch
ProcessorEpoch: schedulepb.ProcessorEpoch{Epoch: "wrong-agent-epoch-1"},
},
MsgType: schedulepb.MsgDispatchTableRequest,
From: a.ownerInfo.captureID,
DispatchTableRequest: &schedulepb.DispatchTableRequest{
Request: &schedulepb.DispatchTableRequest_AddTable{
AddTable: &schedulepb.AddTableRequest{
TableID: 1,
IsSecondary: true,
Checkpoint: &schedulepb.Checkpoint{},
},
},
},
}
// wrong epoch, ignored
response = a.handleMessage([]*schedulepb.Message{addTableRequest})
require.Len(t, a.runningTasks, 0)
require.Len(t, response, 0)

// correct epoch, processing.
addTableRequest.Header.ProcessorEpoch = a.epoch
response = a.handleMessage([]*schedulepb.Message{addTableRequest})
require.Len(t, a.runningTasks, 1)
require.Len(t, response, 0)

heartbeat.Header.OwnerRevision.Revision = 2
response = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Equal(t, len(a.runningTasks), 1)
require.Len(t, response, 1)

// this should never happen in real world
unknownMessage := &schedulepb.Message{
Header: &schedulepb.Message_Header{
Version: a.ownerInfo.version,
OwnerRevision: schedulepb.OwnerRevision{Revision: 2},
ProcessorEpoch: a.epoch,
},
MsgType: schedulepb.MsgUnknown,
From: a.ownerInfo.captureID,
}

response = a.handleMessage([]*schedulepb.Message{unknownMessage})
require.Len(t, response, 0)

// staled message
heartbeat.Header.OwnerRevision.Revision = 1
response = a.handleMessage([]*schedulepb.Message{heartbeat})
require.Len(t, response, 0)
}

func TestAgentUpdateOwnerInfo(t *testing.T) {
Expand All @@ -214,6 +345,109 @@ func TestAgentUpdateOwnerInfo(t *testing.T) {
require.True(t, ok)
}

func TestAgentTick(t *testing.T) {
t.Parallel()

a := newBaseAgent4Test()
trans := newMockTrans()
mockTableExecutor := newMockTableExecutor()
a.trans = trans
a.tableExec = mockTableExecutor

heartbeat := &schedulepb.Message{
Header: &schedulepb.Message_Header{
Version: a.ownerInfo.version,
OwnerRevision: a.ownerInfo.revision,
// first heartbeat from the owner, no processor epoch
ProcessorEpoch: schedulepb.ProcessorEpoch{},
},
MsgType: schedulepb.MsgHeartbeat,
From: a.ownerInfo.captureID,
Heartbeat: &schedulepb.Heartbeat{TableIDs: nil},
}

// receive first heartbeat from the owner
messages := []*schedulepb.Message{heartbeat}
trans.recvBuffer = append(trans.recvBuffer, messages...)

require.NoError(t, a.Tick(context.Background()))
require.Len(t, trans.sendBuffer, 1)
heartbeatResponse := trans.sendBuffer[0]
trans.sendBuffer = trans.sendBuffer[:0]

require.Equal(t, schedulepb.MsgHeartbeatResponse, heartbeatResponse.MsgType)
require.Equal(t, a.ownerInfo.captureID, heartbeatResponse.To)
require.Equal(t, a.captureID, heartbeatResponse.From)

addTableRequest := &schedulepb.Message{
Header: &schedulepb.Message_Header{
Version: a.ownerInfo.version,
OwnerRevision: a.ownerInfo.revision,
ProcessorEpoch: a.epoch,
},
MsgType: schedulepb.MsgDispatchTableRequest,
From: a.ownerInfo.captureID,
DispatchTableRequest: &schedulepb.DispatchTableRequest{
Request: &schedulepb.DispatchTableRequest_AddTable{
AddTable: &schedulepb.AddTableRequest{
TableID: 1,
IsSecondary: true,
Checkpoint: &schedulepb.Checkpoint{},
},
},
},
}

removeTableRequest := &schedulepb.Message{
Header: &schedulepb.Message_Header{
Version: a.ownerInfo.version,
OwnerRevision: a.ownerInfo.revision,
ProcessorEpoch: a.epoch,
},
MsgType: schedulepb.MsgDispatchTableRequest,
From: a.ownerInfo.captureID,
DispatchTableRequest: &schedulepb.DispatchTableRequest{
Request: &schedulepb.DispatchTableRequest_RemoveTable{
RemoveTable: &schedulepb.RemoveTableRequest{
TableID: 2,
},
},
},
}
messages = append(messages, addTableRequest)
messages = append(messages, removeTableRequest)
trans.recvBuffer = append(trans.recvBuffer, messages...)

mockTableExecutor.On("AddTable", mock.Anything,
mock.Anything, mock.Anything, mock.Anything).Return(true, nil)
mockTableExecutor.On("IsAddTableFinished", mock.Anything,
mock.Anything, mock.Anything).Return(false, nil)
require.NoError(t, a.Tick(context.Background()))
responses := trans.sendBuffer[:len(trans.sendBuffer)]
trans.sendBuffer = trans.sendBuffer[:0]
require.Equal(t, schedulepb.MsgHeartbeatResponse, responses[0].MsgType)

messages = messages[:0]
// this one should be ignored, since the previous one with the same tableID is not finished yet.
messages = append(messages, addTableRequest)
trans.recvBuffer = append(trans.recvBuffer, messages...)

mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1]
mockTableExecutor.On("IsAddTableFinished", mock.Anything,
mock.Anything, mock.Anything).Return(true, nil)
require.NoError(t, a.Tick(context.Background()))
responses = trans.sendBuffer[:len(trans.sendBuffer)]
trans.sendBuffer = trans.sendBuffer[:0]
require.Len(t, responses, 1)
require.Equal(t, schedulepb.MsgDispatchTableResponse, responses[0].MsgType)
resp, ok := responses[0].DispatchTableResponse.
Response.(*schedulepb.DispatchTableResponse_AddTable)
require.True(t, ok)
require.Equal(t, schedulepb.TableStatePrepared, resp.AddTable.Status.State)

require.NoError(t, a.Close())
}

// MockTableExecutor is a mock implementation of TableExecutor.
type MockTableExecutor struct {
mock.Mock
Expand All @@ -223,8 +457,8 @@ type MockTableExecutor struct {
tables map[model.TableID]pipeline.TableState
}

// NewMockTableExecutor creates a new mock table executor.
func NewMockTableExecutor() *MockTableExecutor {
// newMockTableExecutor creates a new mock table executor.
func newMockTableExecutor() *MockTableExecutor {
return &MockTableExecutor{
tables: map[model.TableID]pipeline.TableState{},
}
Expand Down Expand Up @@ -311,6 +545,7 @@ func (e *MockTableExecutor) IsAddTableFinished(ctx context.Context, tableID mode
func (e *MockTableExecutor) IsRemoveTableFinished(ctx context.Context, tableID model.TableID) (model.Ts, bool) {
state, ok := e.tables[tableID]
if !ok {
// the real `table executor` processor, would panic in such case.
log.Warn("table to be removed is not found",
zap.Int64("tableID", tableID))
return 0, true
Expand Down
Loading

0 comments on commit f5c3f58

Please sign in to comment.