Skip to content

Commit

Permalink
fix stopping case.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Jun 12, 2022
1 parent a673b6a commit 7068f9c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 18 deletions.
22 changes: 15 additions & 7 deletions cdc/scheduler/internal/tp/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) []*schedulepb.Message {
response := a.handleMessageHeartbeat(message.Heartbeat.GetTableIDs())
result = append(result, response)
case schedulepb.MsgDispatchTableRequest:
a.handleMessageDispatchTableRequest(message.DispatchTableRequest, processorEpoch)
response := a.handleMessageDispatchTableRequest(message.DispatchTableRequest, processorEpoch)
result = append(result, response)
default:
log.Warn("tpscheduler: unknown message received",
zap.String("capture", a.captureID),
Expand Down Expand Up @@ -304,7 +305,7 @@ type dispatchTableTask struct {
func (a *agent) handleMessageDispatchTableRequest(
request *schedulepb.DispatchTableRequest,
epoch schedulepb.ProcessorEpoch,
) {
) *schedulepb.Message {
if a.epoch != epoch {
log.Info("tpscheduler: agent receive dispatch table request "+
"epoch does not match, ignore it",
Expand All @@ -313,7 +314,7 @@ func (a *agent) handleMessageDispatchTableRequest(
zap.String("changefeed", a.changeFeedID.ID),
zap.String("epoch", epoch.Epoch),
zap.String("expected", a.epoch.Epoch))
return
return nil
}
var (
table *table
Expand All @@ -324,15 +325,21 @@ func (a *agent) handleMessageDispatchTableRequest(
// this should be guaranteed by the caller of this method.
switch req := request.Request.(type) {
case *schedulepb.DispatchTableRequest_AddTable:
tableID := req.AddTable.GetTableID()
if a.stopping {
log.Info("tpscheduler: agent is stopping, and decline handle add table request",
zap.String("capture", a.captureID),
zap.String("namespace", a.changeFeedID.Namespace),
zap.String("changefeed", a.changeFeedID.ID),
zap.Any("request", request))
return
table, ok := a.tables[tableID]
if !ok {
table = newTable(tableID, a.tableExec)
}
_ = table.refresh()
return newAddTableResponseMessage(table.status, true)
}
tableID := req.AddTable.GetTableID()

task = &dispatchTableTask{
TableID: tableID,
StartTs: req.AddTable.GetCheckpoint().CheckpointTs,
Expand All @@ -357,7 +364,7 @@ func (a *agent) handleMessageDispatchTableRequest(
zap.String("namespace", a.changeFeedID.Namespace),
zap.String("changefeed", a.changeFeedID.ID),
zap.Any("request", request))
return
return nil
}
task = &dispatchTableTask{
TableID: tableID,
Expand All @@ -371,9 +378,10 @@ func (a *agent) handleMessageDispatchTableRequest(
zap.String("namespace", a.changeFeedID.Namespace),
zap.String("changefeed", a.changeFeedID.ID),
zap.Any("request", request))
return
return nil
}
table.injectDispatchTableTask(task)
return nil
}

// GetLastSentCheckpointTs implement agent interface
Expand Down
12 changes: 1 addition & 11 deletions cdc/scheduler/internal/tp/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,6 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) {
a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch)
responses, err = a.poll(ctx)
require.NoError(t, err)
require.Len(t, responses, 1)

addTableResponse, ok = responses[0].DispatchTableResponse.
Response.(*schedulepb.DispatchTableResponse_AddTable)
require.True(t, ok)
require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.TableID)
require.Equal(t, schedulepb.TableStatePreparing, addTableResponse.AddTable.Status.State)
require.Contains(t, a.tables, model.TableID(1))

mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1]
mockTableExecutor.On("IsAddTableFinished", mock.Anything,
Expand Down Expand Up @@ -585,9 +577,7 @@ func TestAgentTick(t *testing.T) {
mockTableExecutor.On("IsAddTableFinished", mock.Anything,
mock.Anything, mock.Anything).Return(false, nil)
require.NoError(t, a.Tick(ctx))
responses := trans.sendBuffer[:len(trans.sendBuffer)]
trans.sendBuffer = trans.sendBuffer[:0]
require.Equal(t, schedulepb.MsgHeartbeatResponse, responses[0].MsgType)

messages = messages[:0]
messages = append(messages, addTableRequest)
Expand All @@ -597,7 +587,7 @@ func TestAgentTick(t *testing.T) {
mockTableExecutor.On("IsAddTableFinished", mock.Anything,
mock.Anything, mock.Anything).Return(true, nil)
require.NoError(t, a.Tick(ctx))
responses = trans.sendBuffer[:len(trans.sendBuffer)]
responses := trans.sendBuffer[:len(trans.sendBuffer)]
trans.sendBuffer = trans.sendBuffer[:0]
require.Len(t, responses, 1)
require.Equal(t, schedulepb.MsgDispatchTableResponse, responses[0].MsgType)
Expand Down

0 comments on commit 7068f9c

Please sign in to comment.