diff --git a/cdc/model/protocol.go b/cdc/model/protocol.go index b29a67da275..ea6475135e2 100644 --- a/cdc/model/protocol.go +++ b/cdc/model/protocol.go @@ -31,9 +31,10 @@ func DispatchTableTopic(changefeedID ChangeFeedID) p2p.Topic { // DispatchTableMessage is the message body for dispatching a table. type DispatchTableMessage struct { - OwnerRev int64 `json:"owner-rev"` - ID TableID `json:"id"` - IsDelete bool `json:"is-delete"` + OwnerRev int64 `json:"owner-rev"` + Epoch ProcessorEpoch `json:"epoch"` + ID TableID `json:"id"` + IsDelete bool `json:"is-delete"` } // DispatchTableResponseTopic returns a message topic for the result of @@ -44,7 +45,8 @@ func DispatchTableResponseTopic(changefeedID ChangeFeedID) p2p.Topic { // DispatchTableResponseMessage is the message body for the result of dispatching a table. type DispatchTableResponseMessage struct { - ID TableID `json:"id"` + ID TableID `json:"id"` + Epoch ProcessorEpoch `json:"epoch"` } // AnnounceTopic returns a message topic for announcing an ownership change. @@ -64,14 +66,23 @@ func SyncTopic(changefeedID ChangeFeedID) p2p.Topic { return fmt.Sprintf("send-status-resp/%s", changefeedID) } +// ProcessorEpoch designates a continuous period of the processor working normally. +type ProcessorEpoch = string + // SyncMessage is the message body for syncing the current states of a processor. // MsgPack serialization has been implemented to minimize the size of the message. type SyncMessage struct { // Sends the processor's version for compatibility check ProcessorVersion string - Running []TableID - Adding []TableID - Removing []TableID + + // Epoch is reset to a unique value when the processor has + // encountered an internal error or other events so that + // it has to re-sync its states with the Owner. + Epoch ProcessorEpoch + + Running []TableID + Adding []TableID + Removing []TableID } // Marshal serializes the message into MsgPack format. diff --git a/cdc/model/protocol_test.go b/cdc/model/protocol_test.go index 6c4654b9a72..a15e58c3cca 100644 --- a/cdc/model/protocol_test.go +++ b/cdc/model/protocol_test.go @@ -65,21 +65,23 @@ func makeVeryLargeSyncMessage() *SyncMessage { func TestMarshalDispatchTableMessage(t *testing.T) { msg := &DispatchTableMessage{ OwnerRev: 1, + Epoch: "test-epoch", ID: TableID(1), IsDelete: true, } bytes, err := json.Marshal(msg) require.NoError(t, err) - require.Equal(t, `{"owner-rev":1,"id":1,"is-delete":true}`, string(bytes)) + require.Equal(t, `{"owner-rev":1,"epoch":"test-epoch","id":1,"is-delete":true}`, string(bytes)) } func TestMarshalDispatchTableResponseMessage(t *testing.T) { msg := &DispatchTableResponseMessage{ - ID: TableID(1), + ID: TableID(1), + Epoch: "test-epoch", } bytes, err := json.Marshal(msg) require.NoError(t, err) - require.Equal(t, `{"id":1}`, string(bytes)) + require.Equal(t, `{"id":1,"epoch":"test-epoch"}`, string(bytes)) } func TestMarshalAnnounceMessage(t *testing.T) { diff --git a/cdc/owner/scheduler.go b/cdc/owner/scheduler.go index 7f9650948a1..351da5fb861 100644 --- a/cdc/owner/scheduler.go +++ b/cdc/owner/scheduler.go @@ -127,14 +127,27 @@ func (s *schedulerV2) DispatchTable( tableID model.TableID, captureID model.CaptureID, isDelete bool, + epoch model.ProcessorEpoch, ) (done bool, err error) { topic := model.DispatchTableTopic(changeFeedID) message := &model.DispatchTableMessage{ OwnerRev: ctx.GlobalVars().OwnerRevision, ID: tableID, IsDelete: isDelete, + Epoch: epoch, } + defer func() { + if err != nil { + return + } + log.Info("schedulerV2: DispatchTable", + zap.Any("message", message), + zap.Any("successful", done), + zap.String("changefeedID", changeFeedID), + zap.String("captureID", captureID)) + }() + ok, err := s.trySendMessage(ctx, captureID, topic, message) if err != nil { return false, errors.Trace(err) @@ -155,13 +168,24 @@ func (s *schedulerV2) Announce( ctx context.Context, changeFeedID model.ChangeFeedID, captureID model.CaptureID, -) (bool, error) { +) (done bool, err error) { topic := model.AnnounceTopic(changeFeedID) message := &model.AnnounceMessage{ OwnerRev: ctx.GlobalVars().OwnerRevision, OwnerVersion: version.ReleaseSemver(), } + defer func() { + if err != nil { + return + } + log.Info("schedulerV2: Announce", + zap.Any("message", message), + zap.Any("successful", done), + zap.String("changefeedID", changeFeedID), + zap.String("captureID", captureID)) + }() + ok, err := s.trySendMessage(ctx, captureID, topic, message) if err != nil { return false, errors.Trace(err) @@ -239,7 +263,7 @@ func (s *schedulerV2) registerPeerMessageHandlers(ctx context.Context) (ret erro func(sender string, messageI interface{}) error { message := messageI.(*model.DispatchTableResponseMessage) s.stats.RecordDispatchResponse() - s.OnAgentFinishedTableOperation(sender, message.ID) + s.OnAgentFinishedTableOperation(sender, message.ID, message.Epoch) return nil }) if err != nil { @@ -256,6 +280,7 @@ func (s *schedulerV2) registerPeerMessageHandlers(ctx context.Context) (ret erro s.stats.RecordSync() s.OnAgentSyncTaskStatuses( sender, + message.Epoch, message.Running, message.Adding, message.Removing) diff --git a/cdc/processor/agent.go b/cdc/processor/agent.go index 18d8762c9cb..406f3779fd2 100644 --- a/cdc/processor/agent.go +++ b/cdc/processor/agent.go @@ -17,6 +17,8 @@ import ( stdContext "context" "time" + "go.uber.org/zap/zapcore" + "github.com/benbjohnson/clock" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -161,11 +163,23 @@ func (a *agentImpl) Tick(ctx context.Context) error { func (a *agentImpl) FinishTableOperation( ctx context.Context, tableID model.TableID, -) (bool, error) { - done, err := a.trySendMessage( + epoch model.ProcessorEpoch, +) (done bool, err error) { + message := &model.DispatchTableResponseMessage{ID: tableID, Epoch: epoch} + defer func() { + if err != nil { + return + } + log.Info("SchedulerAgent: FinishTableOperation", zap.Any("message", message), + zap.Bool("successful", done), + zap.String("changefeedID", a.changeFeed), + zap.String("ownerID", a.ownerCaptureID)) + }() + + done, err = a.trySendMessage( ctx, a.ownerCaptureID, model.DispatchTableResponseTopic(a.changeFeed), - &model.DispatchTableResponseMessage{ID: tableID}) + message) if err != nil { return false, errors.Trace(err) } @@ -173,19 +187,46 @@ func (a *agentImpl) FinishTableOperation( } func (a *agentImpl) SyncTaskStatuses( - ctx context.Context, - running, adding, removing []model.TableID, -) (bool, error) { - done, err := a.trySendMessage( + ctx context.Context, epoch model.ProcessorEpoch, adding, removing, running []model.TableID, +) (done bool, err error) { + if !a.Barrier(ctx) { + // The Sync message needs to be strongly ordered w.r.t. other messages. + return false, nil + } + + message := &model.SyncMessage{ + ProcessorVersion: version.ReleaseSemver(), + Epoch: epoch, + Running: running, + Adding: adding, + Removing: removing, + } + + defer func() { + if err != nil { + return + } + if log.GetLevel() == zapcore.DebugLevel { + // The message can be REALLY large, so we do not print it + // unless the log level is debug. + log.Debug("SchedulerAgent: SyncTaskStatuses", + zap.Any("message", message), + zap.Bool("successful", done), + zap.String("changefeedID", a.changeFeed), + zap.String("ownerID", a.ownerCaptureID)) + return + } + log.Info("SchedulerAgent: SyncTaskStatuses", + zap.Bool("successful", done), + zap.String("changefeedID", a.changeFeed), + zap.String("ownerID", a.ownerCaptureID)) + }() + + done, err = a.trySendMessage( ctx, a.ownerCaptureID, model.SyncTopic(a.changeFeed), - &model.SyncMessage{ - ProcessorVersion: version.ReleaseSemver(), - Running: running, - Adding: adding, - Removing: removing, - }) + message) if err != nil { return false, errors.Trace(err) } @@ -196,15 +237,30 @@ func (a *agentImpl) SendCheckpoint( ctx context.Context, checkpointTs model.Ts, resolvedTs model.Ts, -) (bool, error) { - done, err := a.trySendMessage( +) (done bool, err error) { + message := &model.CheckpointMessage{ + CheckpointTs: checkpointTs, + ResolvedTs: resolvedTs, + } + + defer func() { + if err != nil { + return + } + // This log is very often, so we only print it if the + // log level is debug. + log.Debug("SchedulerAgent: SendCheckpoint", + zap.Any("message", message), + zap.Bool("successful", done), + zap.String("changefeedID", a.changeFeed), + zap.String("ownerID", a.ownerCaptureID)) + }() + + done, err = a.trySendMessage( ctx, a.ownerCaptureID, model.CheckpointTopic(a.changeFeed), - &model.CheckpointMessage{ - CheckpointTs: checkpointTs, - ResolvedTs: resolvedTs, - }) + message) if err != nil { return false, errors.Trace(err) } @@ -339,7 +395,8 @@ func (a *agentImpl) registerPeerMessageHandlers() (ret error) { ownerCapture, message.OwnerRev, message.ID, - message.IsDelete) + message.IsDelete, + message.Epoch) return nil }) if err != nil { diff --git a/cdc/processor/agent_test.go b/cdc/processor/agent_test.go index f6ed64d2c17..9037e55039c 100644 --- a/cdc/processor/agent_test.go +++ b/cdc/processor/agent_test.go @@ -203,6 +203,7 @@ func TestAgentBasics(t *testing.T) { case syncMsg := <-suite.syncCh: require.Equal(t, &model.SyncMessage{ ProcessorVersion: version.ReleaseSemver(), + Epoch: agent.CurrentEpoch(), Running: nil, Adding: nil, Removing: nil, @@ -211,6 +212,7 @@ func TestAgentBasics(t *testing.T) { _, err = suite.ownerMessageClient.SendMessage(suite.ctx, model.DispatchTableTopic("cf-1"), &model.DispatchTableMessage{ OwnerRev: 1, + Epoch: agent.CurrentEpoch(), ID: 1, IsDelete: false, }) @@ -263,7 +265,8 @@ func TestAgentBasics(t *testing.T) { return false case msg := <-suite.dispatchResponseCh: require.Equal(t, &model.DispatchTableResponseMessage{ - ID: 1, + ID: 1, + Epoch: agent.CurrentEpoch(), }, msg) return true default: @@ -317,6 +320,7 @@ func TestAgentNoOwnerAtStartUp(t *testing.T) { case syncMsg := <-suite.syncCh: require.Equal(t, &model.SyncMessage{ ProcessorVersion: version.ReleaseSemver(), + Epoch: agent.CurrentEpoch(), Running: nil, Adding: nil, Removing: nil, @@ -371,6 +375,7 @@ func TestAgentTolerateClientClosed(t *testing.T) { case syncMsg := <-suite.syncCh: require.Equal(t, &model.SyncMessage{ ProcessorVersion: version.ReleaseSemver(), + Epoch: agent.CurrentEpoch(), Running: nil, Adding: nil, Removing: nil, diff --git a/cdc/scheduler/agent.go b/cdc/scheduler/agent.go index 4a874283c5a..1fae1c10014 100644 --- a/cdc/scheduler/agent.go +++ b/cdc/scheduler/agent.go @@ -18,6 +18,7 @@ import ( "time" "github.com/edwingeng/deque" + "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -70,12 +71,11 @@ type TableExecutor interface { // by the owner. type ProcessorMessenger interface { // FinishTableOperation notifies the owner that a table operation has finished. - FinishTableOperation(ctx context.Context, tableID model.TableID) (done bool, err error) + FinishTableOperation(ctx context.Context, tableID model.TableID, epoch model.ProcessorEpoch) (done bool, err error) // SyncTaskStatuses informs the owner of the processor's current internal state. - SyncTaskStatuses(ctx context.Context, running, adding, removing []model.TableID) (done bool, err error) + SyncTaskStatuses(ctx context.Context, epoch model.ProcessorEpoch, adding, removing, running []model.TableID) (done bool, err error) // SendCheckpoint sends the owner the processor's local watermarks, i.e., checkpoint-ts and resolved-ts. SendCheckpoint(ctx context.Context, checkpointTs model.Ts, resolvedTs model.Ts) (done bool, err error) - // Barrier returns whether there is a pending message not yet acknowledged by the owner. Barrier(ctx context.Context) (done bool) // OnOwnerChanged is called when the owner is changed. @@ -97,6 +97,10 @@ type BaseAgent struct { executor TableExecutor communicator ProcessorMessenger + epochMu sync.RWMutex + // epoch is reset on each Sync message. + epoch model.ProcessorEpoch + // pendingOpsMu protects pendingOps. // Note that we need a mutex because some methods are expected // to be called from a message handler goroutine. @@ -136,7 +140,7 @@ func NewBaseAgent( config *BaseAgentConfig, ) *BaseAgent { logger := log.L().With(zap.String("changefeed", changeFeedID)) - return &BaseAgent{ + ret := &BaseAgent{ pendingOps: deque.NewDeque(), tableOperations: map[model.TableID]*agentOperation{}, logger: logger, @@ -148,6 +152,8 @@ func NewBaseAgent( ownerHasChanged: atomic.NewBool(false), config: config, } + ret.resetEpoch() + return ret } type agentOperationStatus int32 @@ -161,6 +167,10 @@ const ( type agentOperation struct { TableID model.TableID IsDelete bool + Epoch model.ProcessorEpoch + + // FromOwnerID is for debugging purposesFromOwnerID + FromOwnerID model.CaptureID status agentOperationStatus } @@ -185,6 +195,7 @@ func (a *BaseAgent) Tick(ctx context.Context) error { } if a.needSyncNow.Load() { + a.resetEpoch() done, err := a.sendSync(ctx) if err != nil { return errors.Trace(err) @@ -204,6 +215,12 @@ func (a *BaseAgent) Tick(ctx context.Context) error { opsToApply := a.popPendingOps() for _, op := range opsToApply { + if op.Epoch != a.getEpoch() { + a.logger.Info("dispatch request epoch does not match", + zap.String("epoch", op.Epoch), + zap.String("expectedEpoch", a.getEpoch())) + continue + } if _, ok := a.tableOperations[op.TableID]; ok { a.logger.DPanic("duplicate operation", zap.Any("op", op)) return cerrors.ErrProcessorDuplicateOperations.GenWithStackByArgs(op.TableID) @@ -259,7 +276,7 @@ func (a *BaseAgent) sendSync(ctx context.Context) (bool, error) { util.SortTableIDs(running) util.SortTableIDs(adding) util.SortTableIDs(removing) - done, err := a.communicator.SyncTaskStatuses(ctx, running, adding, removing) + done, err := a.communicator.SyncTaskStatuses(ctx, a.getEpoch(), adding, removing, running) if err != nil { return false, errors.Trace(err) } @@ -272,6 +289,7 @@ func (a *BaseAgent) processOperations(ctx context.Context) error { for tableID, op := range a.tableOperations { switch op.status { case operationReceived: + a.logger.Info("Agent start processing operation", zap.Any("op", op)) if !op.IsDelete { // add table done, err := a.executor.AddTable(ctx, op.TableID) @@ -306,7 +324,8 @@ func (a *BaseAgent) processOperations(ctx context.Context) error { op.status = operationFinished fallthrough case operationFinished: - done, err := a.communicator.FinishTableOperation(ctx, op.TableID) + a.logger.Info("Agent finish processing operation", zap.Any("op", op)) + done, err := a.communicator.FinishTableOperation(ctx, op.TableID, a.getEpoch()) if err != nil { return errors.Trace(err) } @@ -343,6 +362,7 @@ func (a *BaseAgent) OnOwnerDispatchedTask( ownerRev int64, tableID model.TableID, isDelete bool, + epoch model.ProcessorEpoch, ) { if !a.updateOwnerInfo(ownerCaptureID, ownerRev) { a.logger.Info("task from stale owner ignored", @@ -355,13 +375,15 @@ func (a *BaseAgent) OnOwnerDispatchedTask( defer a.pendingOpsMu.Unlock() op := &agentOperation{ - TableID: tableID, - IsDelete: isDelete, - status: operationReceived, + TableID: tableID, + IsDelete: isDelete, + Epoch: epoch, + FromOwnerID: ownerCaptureID, + status: operationReceived, } a.pendingOps.PushBack(op) - a.logger.Debug("OnOwnerDispatchedTask", + a.logger.Info("OnOwnerDispatchedTask", zap.String("ownerCaptureID", ownerCaptureID), zap.Int64("ownerRev", ownerRev), zap.Any("op", op)) @@ -456,3 +478,26 @@ func (a *BaseAgent) currentOwner() model.CaptureID { return a.ownerInfo.OwnerCaptureID } + +func (a *BaseAgent) resetEpoch() { + a.epochMu.Lock() + defer a.epochMu.Unlock() + + // We are using UUIDs because we only need uniqueness guarantee for the epoch, + // BUT NOT ordering guarantees. The reason is that the Sync messages are themselves + // barriers, so there is no need to accommodate messages from future epochs. + a.epoch = uuid.New().String() +} + +func (a *BaseAgent) getEpoch() model.ProcessorEpoch { + a.epochMu.RLock() + defer a.epochMu.RUnlock() + + return a.epoch +} + +// CurrentEpoch is a public function used in unit tests for +// checking epoch-related invariants. +func (a *BaseAgent) CurrentEpoch() model.ProcessorEpoch { + return a.getEpoch() +} diff --git a/cdc/scheduler/agent_mock.go b/cdc/scheduler/agent_mock.go index 113cd1f5141..a0d11476652 100644 --- a/cdc/scheduler/agent_mock.go +++ b/cdc/scheduler/agent_mock.go @@ -30,14 +30,14 @@ type MockProcessorMessenger struct { } // FinishTableOperation marks this function as being called. -func (m *MockProcessorMessenger) FinishTableOperation(ctx cdcContext.Context, tableID model.TableID) (bool, error) { - args := m.Called(ctx, tableID) +func (m *MockProcessorMessenger) FinishTableOperation(ctx cdcContext.Context, tableID model.TableID, epoch model.ProcessorEpoch) (bool, error) { + args := m.Called(ctx, tableID, epoch) return args.Bool(0), args.Error(1) } // SyncTaskStatuses marks this function as being called. -func (m *MockProcessorMessenger) SyncTaskStatuses(ctx cdcContext.Context, running, adding, removing []model.TableID) (bool, error) { - args := m.Called(ctx, running, adding, removing) +func (m *MockProcessorMessenger) SyncTaskStatuses(ctx cdcContext.Context, epoch model.ProcessorEpoch, adding, removing, running []model.TableID) (bool, error) { + args := m.Called(ctx, epoch, running, adding, removing) return args.Bool(0), args.Error(1) } diff --git a/cdc/scheduler/agent_test.go b/cdc/scheduler/agent_test.go index 92330ade8ae..fe3b0385688 100644 --- a/cdc/scheduler/agent_test.go +++ b/cdc/scheduler/agent_test.go @@ -31,15 +31,19 @@ func TestAgentAddTable(t *testing.T) { executor := NewMockTableExecutor(t) messenger := &MockProcessorMessenger{} agent := NewBaseAgent("test-cf", executor, messenger, agentConfigForTesting) - messenger.On("SyncTaskStatuses", mock.Anything, []model.TableID(nil), []model.TableID(nil), []model.TableID(nil)). - Return(true, nil) + var epoch model.ProcessorEpoch + messenger.On("SyncTaskStatuses", mock.Anything, mock.AnythingOfType("string"), []model.TableID(nil), []model.TableID(nil), []model.TableID(nil)). + Return(true, nil). + Run(func(args mock.Arguments) { + epoch = args.String(1) + }) err := agent.Tick(ctx) require.NoError(t, err) messenger.AssertExpectations(t) executor.ExpectedCalls = nil messenger.ExpectedCalls = nil - agent.OnOwnerDispatchedTask("capture-1", 1, model.TableID(1), false) + agent.OnOwnerDispatchedTask("capture-1", 1, model.TableID(1), false, epoch) executor.On("AddTable", mock.Anything, model.TableID(1)).Return(true, nil) messenger.On("OnOwnerChanged", mock.Anything, "capture-1") @@ -53,7 +57,7 @@ func TestAgentAddTable(t *testing.T) { executor.Running[model.TableID(1)] = struct{}{} executor.On("GetCheckpoint").Return(model.Ts(1002), model.Ts(1000)) messenger.On("SendCheckpoint", mock.Anything, model.Ts(1002), model.Ts(1000)).Return(true, nil) - messenger.On("FinishTableOperation", mock.Anything, model.TableID(1)).Return(true, nil) + messenger.On("FinishTableOperation", mock.Anything, model.TableID(1), epoch).Return(true, nil) err = agent.Tick(ctx) require.NoError(t, err) @@ -81,8 +85,13 @@ func TestAgentRemoveTable(t *testing.T) { messenger := &MockProcessorMessenger{} agent := NewBaseAgent("test-cf", executor, messenger, agentConfigForTesting) agent.OnOwnerAnnounce("capture-2", 1) - messenger.On("SyncTaskStatuses", mock.Anything, []model.TableID{1, 2}, []model.TableID(nil), []model.TableID(nil)). - Return(true, nil) + + var epoch model.ProcessorEpoch + messenger.On("SyncTaskStatuses", mock.Anything, mock.AnythingOfType("string"), []model.TableID{1, 2}, []model.TableID(nil), []model.TableID(nil)). + Return(true, nil). + Run(func(args mock.Arguments) { + epoch = args.String(1) + }) messenger.On("OnOwnerChanged", mock.Anything, "capture-2") executor.On("GetCheckpoint").Return(model.Ts(1000), model.Ts(1000)) messenger.On("SendCheckpoint", mock.Anything, model.Ts(1000), model.Ts(1000)).Return(true, nil) @@ -92,7 +101,7 @@ func TestAgentRemoveTable(t *testing.T) { executor.ExpectedCalls = nil messenger.ExpectedCalls = nil - agent.OnOwnerDispatchedTask("capture-2", 1, model.TableID(1), true) + agent.OnOwnerDispatchedTask("capture-2", 1, model.TableID(1), true, epoch) executor.On("GetCheckpoint").Return(model.Ts(1000), model.Ts(1000)) messenger.On("SendCheckpoint", mock.Anything, model.Ts(1000), model.Ts(1000)).Return(true, nil) executor.On("RemoveTable", mock.Anything, model.TableID(1)).Return(true, nil) @@ -105,8 +114,17 @@ func TestAgentRemoveTable(t *testing.T) { executor.ExpectedCalls = nil messenger.ExpectedCalls = nil executor.On("GetCheckpoint").Return(model.Ts(1000), model.Ts(1000)) - messenger.On("SyncTaskStatuses", mock.Anything, []model.TableID{2}, []model.TableID(nil), []model.TableID{1}). - Return(true, nil) + messenger.On("SyncTaskStatuses", + mock.Anything, + mock.AnythingOfType("string"), + []model.TableID{2}, + []model.TableID(nil), + []model.TableID{1}, + ). + Return(true, nil). + Run(func(args mock.Arguments) { + epoch = args.String(1) + }) messenger.On("OnOwnerChanged", mock.Anything, "capture-3") messenger.On("SendCheckpoint", mock.Anything, model.Ts(1000), model.Ts(1000)).Return(true, nil) messenger.On("Barrier", mock.Anything).Return(true) @@ -120,7 +138,7 @@ func TestAgentRemoveTable(t *testing.T) { delete(executor.Removing, model.TableID(1)) executor.On("GetCheckpoint").Return(model.Ts(1002), model.Ts(1000)) messenger.On("Barrier", mock.Anything).Return(true) - messenger.On("FinishTableOperation", mock.Anything, model.TableID(1)).Return(true, nil) + messenger.On("FinishTableOperation", mock.Anything, model.TableID(1), epoch).Return(true, nil) messenger.On("SendCheckpoint", mock.Anything, model.Ts(1002), model.Ts(1000)).Return(true, nil) err = agent.Tick(ctx) @@ -134,13 +152,22 @@ func TestAgentOwnerChangedWhileAddingTable(t *testing.T) { executor := NewMockTableExecutor(t) messenger := &MockProcessorMessenger{} agent := NewBaseAgent("test-cf", executor, messenger, agentConfigForTesting) - messenger.On("SyncTaskStatuses", mock.Anything, []model.TableID(nil), []model.TableID(nil), []model.TableID(nil)). - Return(true, nil) + + var epoch model.ProcessorEpoch + messenger.On("SyncTaskStatuses", + mock.Anything, + mock.AnythingOfType("string"), + []model.TableID(nil), []model.TableID(nil), []model.TableID(nil), + ). + Return(true, nil). + Run(func(args mock.Arguments) { + epoch = args.String(1) + }) err := agent.Tick(ctx) require.NoError(t, err) messenger.AssertExpectations(t) - agent.OnOwnerDispatchedTask("capture-1", 1, model.TableID(1), false) + agent.OnOwnerDispatchedTask("capture-1", 1, model.TableID(1), false, epoch) executor.On("AddTable", mock.Anything, model.TableID(1)).Return(true, nil) messenger.On("OnOwnerChanged", mock.Anything, "capture-1") @@ -161,8 +188,16 @@ func TestAgentOwnerChangedWhileAddingTable(t *testing.T) { messenger.ExpectedCalls = nil agent.OnOwnerAnnounce("capture-2", 2) messenger.On("OnOwnerChanged", mock.Anything, "capture-2") - messenger.On("SyncTaskStatuses", mock.Anything, []model.TableID(nil), []model.TableID{1}, []model.TableID(nil)). - Return(true, nil) + messenger.On( + "SyncTaskStatuses", + mock.Anything, + mock.AnythingOfType("string"), + []model.TableID(nil), []model.TableID{1}, []model.TableID(nil), + ). + Return(true, nil). + Run(func(args mock.Arguments) { + epoch = args.String(1) + }) messenger.On("Barrier", mock.Anything).Return(true) executor.On("GetCheckpoint").Return(model.Ts(1002), model.Ts(1000)) messenger.On("SendCheckpoint", mock.Anything, model.Ts(1002), model.Ts(1000)).Return(true, nil) @@ -179,13 +214,18 @@ func TestAgentReceiveFromStaleOwner(t *testing.T) { messenger := &MockProcessorMessenger{} agent := NewBaseAgent("test-cf", executor, messenger, agentConfigForTesting) agent.checkpointSender = &mockCheckpointSender{} - messenger.On("SyncTaskStatuses", mock.Anything, []model.TableID(nil), []model.TableID(nil), []model.TableID(nil)). - Return(true, nil) + + var epoch model.ProcessorEpoch + messenger.On("SyncTaskStatuses", mock.Anything, mock.AnythingOfType("string"), + []model.TableID(nil), []model.TableID(nil), []model.TableID(nil)). + Return(true, nil).Run(func(args mock.Arguments) { + epoch = args.String(1) + }) err := agent.Tick(ctx) require.NoError(t, err) messenger.AssertExpectations(t) - agent.OnOwnerDispatchedTask("capture-1", 1, model.TableID(1), false) + agent.OnOwnerDispatchedTask("capture-1", 1, model.TableID(1), false, epoch) executor.On("AddTable", mock.Anything, model.TableID(1)).Return(true, nil) messenger.On("OnOwnerChanged", mock.Anything, "capture-1") @@ -197,7 +237,7 @@ func TestAgentReceiveFromStaleOwner(t *testing.T) { messenger.ExpectedCalls = nil executor.On("GetCheckpoint").Return(model.Ts(1002), model.Ts(1000)) // Stale owner - agent.OnOwnerDispatchedTask("capture-2", 0, model.TableID(2), false) + agent.OnOwnerDispatchedTask("capture-2", 0, model.TableID(2), false, defaultEpoch) err = agent.Tick(ctx) require.NoError(t, err) @@ -220,7 +260,8 @@ func TestOwnerMismatchShouldPanic(t *testing.T) { messenger := &MockProcessorMessenger{} agent := NewBaseAgent("test-cf", executor, messenger, agentConfigForTesting) agent.checkpointSender = &mockCheckpointSender{} - messenger.On("SyncTaskStatuses", mock.Anything, []model.TableID(nil), []model.TableID(nil), []model.TableID(nil)). + messenger.On("SyncTaskStatuses", mock.Anything, mock.AnythingOfType("string"), + []model.TableID(nil), []model.TableID(nil), []model.TableID(nil)). Return(true, nil) err := agent.Tick(ctx) require.NoError(t, err) @@ -239,3 +280,51 @@ func TestOwnerMismatchShouldPanic(t *testing.T) { agent.OnOwnerAnnounce("capture-2", 1) }, "should have panicked") } + +func TestIgnoreStaleEpoch(t *testing.T) { + ctx := cdcContext.NewBackendContext4Test(false) + + executor := NewMockTableExecutor(t) + messenger := &MockProcessorMessenger{} + agent := NewBaseAgent("test-cf", executor, messenger, agentConfigForTesting) + agent.checkpointSender = &mockCheckpointSender{} + + var epoch, newEpoch model.ProcessorEpoch + messenger.On("SyncTaskStatuses", mock.Anything, mock.AnythingOfType("string"), + []model.TableID(nil), []model.TableID(nil), []model.TableID(nil)). + Return(true, nil).Run(func(args mock.Arguments) { + epoch = args.String(1) + }) + + err := agent.Tick(ctx) + require.NoError(t, err) + messenger.AssertExpectations(t) + + agent.OnOwnerAnnounce("capture-1", 1) + messenger.On("OnOwnerChanged", mock.Anything, "capture-1") + + err = agent.Tick(ctx) + require.NoError(t, err) + messenger.AssertExpectations(t) + + messenger.ExpectedCalls = nil + messenger.On("OnOwnerChanged", mock.Anything, "capture-1") + messenger.On("SyncTaskStatuses", mock.Anything, mock.AnythingOfType("string"), + []model.TableID(nil), []model.TableID(nil), []model.TableID(nil)). + Return(true, nil).Run(func(args mock.Arguments) { + newEpoch = args.String(1) + }) + agent.OnOwnerAnnounce("capture-1", 1) + + err = agent.Tick(ctx) + require.NoError(t, err) + messenger.AssertExpectations(t) + + require.NotEqual(t, epoch, newEpoch) + agent.OnOwnerDispatchedTask("capture-1", 1, model.TableID(2), false, epoch) + + err = agent.Tick(ctx) + require.NoError(t, err) + messenger.AssertExpectations(t) + executor.AssertNotCalled(t, "AddTable", mock.Anything, model.TableID(1)) +} diff --git a/cdc/scheduler/schedule_dispatcher.go b/cdc/scheduler/schedule_dispatcher.go index 63cde335b05..ac2de59c94c 100644 --- a/cdc/scheduler/schedule_dispatcher.go +++ b/cdc/scheduler/schedule_dispatcher.go @@ -61,7 +61,8 @@ type ScheduleDispatcherCommunicator interface { changeFeedID model.ChangeFeedID, tableID model.TableID, captureID model.CaptureID, - isDelete bool, // True when we want to remove a table from the capture. + isDelete bool, + epoch model.ProcessorEpoch, ) (done bool, err error) // Announce announces to the specified capture that the current node has become the Owner. @@ -125,6 +126,10 @@ type captureStatus struct { // dispatch a table. SyncStatus captureSyncStatus + // Epoch is reset when the processor's internal states + // have been reset. + Epoch model.ProcessorEpoch + // Watermark fields CheckpointTs model.Ts ResolvedTs model.Ts @@ -396,7 +401,9 @@ func (s *BaseScheduleDispatcher) addTable( } } - ok, err = s.communicator.DispatchTable(ctx, s.changeFeedID, tableID, target, false) + epoch := s.captureStatus[target].Epoch + ok, err = s.communicator.DispatchTable( + ctx, s.changeFeedID, tableID, target, false, epoch) if err != nil { return false, errors.Trace(err) } @@ -428,7 +435,8 @@ func (s *BaseScheduleDispatcher) removeTable( } // need to delete table captureID := record.CaptureID - ok, err = s.communicator.DispatchTable(ctx, s.changeFeedID, tableID, captureID, true) + epoch := s.captureStatus[captureID].Epoch + ok, err = s.communicator.DispatchTable(ctx, s.changeFeedID, tableID, captureID, true, epoch) if err != nil { return false, errors.Trace(err) } @@ -496,8 +504,10 @@ func (s *BaseScheduleDispatcher) rebalance(ctx context.Context) (done bool, err zap.Any("tableRecord", record)) } + epoch := s.captureStatus[record.CaptureID].Epoch // Removes the table from the current capture - ok, err := s.communicator.DispatchTable(ctx, s.changeFeedID, record.TableID, record.CaptureID, true) + ok, err := s.communicator.DispatchTable( + ctx, s.changeFeedID, record.TableID, record.CaptureID, true, epoch) if err != nil { return false, errors.Trace(err) } @@ -513,13 +523,18 @@ func (s *BaseScheduleDispatcher) rebalance(ctx context.Context) (done bool, err // OnAgentFinishedTableOperation is called when a table operation has been finished by // the processor. -func (s *BaseScheduleDispatcher) OnAgentFinishedTableOperation(captureID model.CaptureID, tableID model.TableID) { +func (s *BaseScheduleDispatcher) OnAgentFinishedTableOperation( + captureID model.CaptureID, + tableID model.TableID, + epoch model.ProcessorEpoch, +) { s.mu.Lock() defer s.mu.Unlock() logger := s.logger.With( zap.String("captureID", captureID), zap.Int64("tableID", tableID), + zap.String("epoch", epoch), ) if _, ok := s.captures[captureID]; !ok { @@ -527,6 +542,18 @@ func (s *BaseScheduleDispatcher) OnAgentFinishedTableOperation(captureID model.C return } + captureSt, ok := s.captureStatus[captureID] + if !ok { + logger.Warn("Message from an unknown processor, ignore") + return + } + + if captureSt.Epoch != epoch { + logger.Warn("Processor epoch does not match", + zap.String("expected", captureSt.Epoch)) + return + } + record, ok := s.tables.GetTableRecord(tableID) if !ok { logger.Warn("response about a stale table, ignore") @@ -553,12 +580,18 @@ func (s *BaseScheduleDispatcher) OnAgentFinishedTableOperation(captureID model.C } // OnAgentSyncTaskStatuses is called when the processor sends its complete current state. -func (s *BaseScheduleDispatcher) OnAgentSyncTaskStatuses(captureID model.CaptureID, running, adding, removing []model.TableID) { +func (s *BaseScheduleDispatcher) OnAgentSyncTaskStatuses( + captureID model.CaptureID, + epoch model.ProcessorEpoch, + running, adding, removing []model.TableID, +) { s.mu.Lock() defer s.mu.Unlock() logger := s.logger.With(zap.String("captureID", captureID)) - logger.Info("scheduler received sync", zap.String("captureID", captureID)) + logger.Info("scheduler received sync", + zap.String("captureID", captureID), + zap.String("epoch", epoch)) if ce := logger.Check(zap.DebugLevel, "OnAgentSyncTaskStatuses"); ce != nil { // Print this information only in debug mode. @@ -604,7 +637,9 @@ func (s *BaseScheduleDispatcher) OnAgentSyncTaskStatuses(captureID model.Capture s.tables.AddTableRecord(&util.TableRecord{TableID: tableID, CaptureID: captureID, Status: util.RemovingTable}) } - s.captureStatus[captureID].SyncStatus = captureSyncFinished + status := s.captureStatus[captureID] + status.SyncStatus = captureSyncFinished + status.Epoch = epoch } // OnAgentCheckpoint is called when the processor sends a checkpoint. diff --git a/cdc/scheduler/schedule_dispatcher_test.go b/cdc/scheduler/schedule_dispatcher_test.go index e9443cc78a7..3e0db461213 100644 --- a/cdc/scheduler/schedule_dispatcher_test.go +++ b/cdc/scheduler/schedule_dispatcher_test.go @@ -28,6 +28,11 @@ import ( var _ ScheduleDispatcherCommunicator = (*mockScheduleDispatcherCommunicator)(nil) +const ( + defaultEpoch = "default-epoch" + nextEpoch = "next-epoch" +) + type mockScheduleDispatcherCommunicator struct { mock.Mock addTableRecords map[model.CaptureID][]model.TableID @@ -56,20 +61,22 @@ func (m *mockScheduleDispatcherCommunicator) DispatchTable( tableID model.TableID, captureID model.CaptureID, isDelete bool, + epoch model.ProcessorEpoch, ) (done bool, err error) { if !m.isBenchmark { log.Info("dispatch table called", zap.String("changefeed", changeFeedID), zap.Int64("tableID", tableID), zap.String("captureID", captureID), - zap.Bool("isDelete", isDelete)) + zap.Bool("isDelete", isDelete), + zap.String("epoch", epoch)) if !isDelete { m.addTableRecords[captureID] = append(m.addTableRecords[captureID], tableID) } else { m.removeTableRecords[captureID] = append(m.removeTableRecords[captureID], tableID) } } - args := m.Called(ctx, changeFeedID, tableID, captureID, isDelete) + args := m.Called(ctx, changeFeedID, tableID, captureID, isDelete, epoch) return args.Bool(0), args.Error(1) } @@ -109,12 +116,12 @@ func TestDispatchTable(t *testing.T) { require.Equal(t, CheckpointCannotProceed, resolvedTs) communicator.AssertExpectations(t) - dispatcher.OnAgentSyncTaskStatuses("capture-1", []model.TableID{}, []model.TableID{}, []model.TableID{}) - dispatcher.OnAgentSyncTaskStatuses("capture-2", []model.TableID{}, []model.TableID{}, []model.TableID{}) + dispatcher.OnAgentSyncTaskStatuses("capture-1", defaultEpoch, []model.TableID{}, []model.TableID{}, []model.TableID{}) + dispatcher.OnAgentSyncTaskStatuses("capture-2", defaultEpoch, []model.TableID{}, []model.TableID{}, []model.TableID{}) communicator.Reset() // Injects a dispatch table failure - communicator.On("DispatchTable", mock.Anything, "cf-1", mock.Anything, mock.Anything, false). + communicator.On("DispatchTable", mock.Anything, "cf-1", mock.Anything, mock.Anything, false, defaultEpoch). Return(false, nil) checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1000, []model.TableID{1, 2, 3}, defaultMockCaptureInfos) require.NoError(t, err) @@ -123,11 +130,11 @@ func TestDispatchTable(t *testing.T) { communicator.AssertExpectations(t) communicator.Reset() - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(1), mock.Anything, false). + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(1), mock.Anything, false, defaultEpoch). Return(true, nil) - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(2), mock.Anything, false). + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(2), mock.Anything, false, defaultEpoch). Return(true, nil) - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(3), mock.Anything, false). + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(3), mock.Anything, false, defaultEpoch). Return(true, nil) checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1000, []model.TableID{1, 2, 3}, defaultMockCaptureInfos) require.NoError(t, err) @@ -152,7 +159,7 @@ func TestDispatchTable(t *testing.T) { for captureID, tables := range communicator.addTableRecords { for _, tableID := range tables { - dispatcher.OnAgentFinishedTableOperation(captureID, tableID) + dispatcher.OnAgentFinishedTableOperation(captureID, tableID, defaultEpoch) } } @@ -194,24 +201,24 @@ func TestSyncCaptures(t *testing.T) { require.Equal(t, CheckpointCannotProceed, checkpointTs) require.Equal(t, CheckpointCannotProceed, resolvedTs) - dispatcher.OnAgentSyncTaskStatuses("capture-1", []model.TableID{1, 2, 3}, []model.TableID{4, 5}, []model.TableID{6, 7}) + dispatcher.OnAgentSyncTaskStatuses("capture-1", defaultEpoch, []model.TableID{1, 2, 3}, []model.TableID{4, 5}, []model.TableID{6, 7}) checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1500, []model.TableID{1, 2, 3, 4, 5}, defaultMockCaptureInfos) require.NoError(t, err) require.Equal(t, CheckpointCannotProceed, checkpointTs) require.Equal(t, CheckpointCannotProceed, resolvedTs) communicator.Reset() - dispatcher.OnAgentFinishedTableOperation("capture-1", 4) - dispatcher.OnAgentFinishedTableOperation("capture-1", 5) - dispatcher.OnAgentSyncTaskStatuses("capture-2", []model.TableID(nil), []model.TableID(nil), []model.TableID(nil)) + dispatcher.OnAgentFinishedTableOperation("capture-1", 4, defaultEpoch) + dispatcher.OnAgentFinishedTableOperation("capture-1", 5, defaultEpoch) + dispatcher.OnAgentSyncTaskStatuses("capture-2", defaultEpoch, []model.TableID(nil), []model.TableID(nil), []model.TableID(nil)) checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1500, []model.TableID{1, 2, 3, 4, 5}, defaultMockCaptureInfos) require.NoError(t, err) require.Equal(t, CheckpointCannotProceed, checkpointTs) require.Equal(t, CheckpointCannotProceed, resolvedTs) communicator.Reset() - dispatcher.OnAgentFinishedTableOperation("capture-1", 6) - dispatcher.OnAgentFinishedTableOperation("capture-1", 7) + dispatcher.OnAgentFinishedTableOperation("capture-1", 6, defaultEpoch) + dispatcher.OnAgentFinishedTableOperation("capture-1", 7, defaultEpoch) checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1500, []model.TableID{1, 2, 3, 4, 5}, defaultMockCaptureInfos) require.NoError(t, err) require.Equal(t, model.Ts(1500), checkpointTs) @@ -229,7 +236,7 @@ func TestSyncUnknownCapture(t *testing.T) { dispatcher.captureStatus = map[model.CaptureID]*captureStatus{} // empty capture status // Sends a sync from an unknown capture - dispatcher.OnAgentSyncTaskStatuses("capture-1", []model.TableID{1, 2, 3}, []model.TableID{4, 5}, []model.TableID{6, 7}) + dispatcher.OnAgentSyncTaskStatuses("capture-1", defaultEpoch, []model.TableID{1, 2, 3}, []model.TableID{4, 5}, []model.TableID{6, 7}) // We expect the `Sync` to be ignored. checkpointTs, resolvedTs, err := dispatcher.Tick(ctx, 1500, []model.TableID{1, 2, 3, 4, 5}, mockCaptureInfos) @@ -249,11 +256,13 @@ func TestRemoveTable(t *testing.T) { SyncStatus: captureSyncFinished, CheckpointTs: 1500, ResolvedTs: 1500, + Epoch: defaultEpoch, }, "capture-2": { SyncStatus: captureSyncFinished, CheckpointTs: 1500, ResolvedTs: 1500, + Epoch: defaultEpoch, }, } dispatcher.tables.AddTableRecord(&util.TableRecord{ @@ -278,7 +287,7 @@ func TestRemoveTable(t *testing.T) { require.Equal(t, model.Ts(1500), resolvedTs) // Inject a dispatch table failure - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(3), "capture-1", true). + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(3), "capture-1", true, defaultEpoch). Return(false, nil) checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1500, []model.TableID{1, 2}, defaultMockCaptureInfos) require.NoError(t, err) @@ -287,7 +296,7 @@ func TestRemoveTable(t *testing.T) { communicator.AssertExpectations(t) communicator.Reset() - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(3), "capture-1", true). + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(3), "capture-1", true, defaultEpoch). Return(true, nil) checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1500, []model.TableID{1, 2}, defaultMockCaptureInfos) require.NoError(t, err) @@ -295,7 +304,7 @@ func TestRemoveTable(t *testing.T) { require.Equal(t, CheckpointCannotProceed, resolvedTs) communicator.AssertExpectations(t) - dispatcher.OnAgentFinishedTableOperation("capture-1", 3) + dispatcher.OnAgentFinishedTableOperation("capture-1", 3, defaultEpoch) communicator.Reset() checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1500, []model.TableID{1, 2}, defaultMockCaptureInfos) require.NoError(t, err) @@ -322,11 +331,13 @@ func TestCaptureGone(t *testing.T) { SyncStatus: captureSyncFinished, CheckpointTs: 1500, ResolvedTs: 1500, + Epoch: defaultEpoch, }, "capture-2": { SyncStatus: captureSyncFinished, CheckpointTs: 1500, ResolvedTs: 1500, + Epoch: defaultEpoch, }, } dispatcher.tables.AddTableRecord(&util.TableRecord{ @@ -345,7 +356,7 @@ func TestCaptureGone(t *testing.T) { Status: util.RunningTable, }) - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(2), "capture-1", false). + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(2), "capture-1", false, defaultEpoch). Return(true, nil) checkpointTs, resolvedTs, err := dispatcher.Tick(ctx, 1500, []model.TableID{1, 2, 3}, mockCaptureInfos) require.NoError(t, err) @@ -365,11 +376,13 @@ func TestCaptureRestarts(t *testing.T) { SyncStatus: captureSyncFinished, CheckpointTs: 1500, ResolvedTs: 1500, + Epoch: defaultEpoch, }, "capture-2": { SyncStatus: captureSyncFinished, CheckpointTs: 1500, ResolvedTs: 1500, + Epoch: defaultEpoch, }, } dispatcher.tables.AddTableRecord(&util.TableRecord{ @@ -388,8 +401,8 @@ func TestCaptureRestarts(t *testing.T) { Status: util.RunningTable, }) - dispatcher.OnAgentSyncTaskStatuses("capture-2", []model.TableID{}, []model.TableID{}, []model.TableID{}) - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(2), "capture-2", false). + dispatcher.OnAgentSyncTaskStatuses("capture-2", nextEpoch, []model.TableID{}, []model.TableID{}, []model.TableID{}) + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(2), "capture-2", false, nextEpoch). Return(true, nil) checkpointTs, resolvedTs, err := dispatcher.Tick(ctx, 1500, []model.TableID{1, 2, 3}, defaultMockCaptureInfos) require.NoError(t, err) @@ -420,11 +433,13 @@ func TestCaptureGoneWhileMovingTable(t *testing.T) { SyncStatus: captureSyncFinished, CheckpointTs: 1300, ResolvedTs: 1600, + Epoch: defaultEpoch, }, "capture-2": { SyncStatus: captureSyncFinished, CheckpointTs: 1500, ResolvedTs: 1550, + Epoch: defaultEpoch, }, } dispatcher.tables.AddTableRecord(&util.TableRecord{ @@ -444,7 +459,7 @@ func TestCaptureGoneWhileMovingTable(t *testing.T) { }) dispatcher.MoveTable(1, "capture-2") - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(1), "capture-1", true). + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(1), "capture-1", true, defaultEpoch). Return(true, nil) checkpointTs, resolvedTs, err := dispatcher.Tick(ctx, 1300, []model.TableID{1, 2, 3}, mockCaptureInfos) require.NoError(t, err) @@ -453,11 +468,11 @@ func TestCaptureGoneWhileMovingTable(t *testing.T) { communicator.AssertExpectations(t) delete(mockCaptureInfos, "capture-2") - dispatcher.OnAgentFinishedTableOperation("capture-1", 1) + dispatcher.OnAgentFinishedTableOperation("capture-1", 1, defaultEpoch) communicator.Reset() - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(1), mock.Anything, false). + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(1), mock.Anything, false, defaultEpoch). Return(true, nil) - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(2), mock.Anything, false). + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(2), mock.Anything, false, defaultEpoch). Return(true, nil) checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1300, []model.TableID{1, 2, 3}, mockCaptureInfos) require.NoError(t, err) @@ -492,16 +507,19 @@ func TestRebalance(t *testing.T) { SyncStatus: captureSyncFinished, CheckpointTs: 1300, ResolvedTs: 1600, + Epoch: defaultEpoch, }, "capture-2": { SyncStatus: captureSyncFinished, CheckpointTs: 1500, ResolvedTs: 1550, + Epoch: defaultEpoch, }, "capture-3": { SyncStatus: captureSyncFinished, CheckpointTs: 1400, ResolvedTs: 1650, + Epoch: defaultEpoch, }, } for i := 1; i <= 6; i++ { @@ -513,7 +531,7 @@ func TestRebalance(t *testing.T) { } dispatcher.Rebalance() - communicator.On("DispatchTable", mock.Anything, "cf-1", mock.Anything, mock.Anything, true). + communicator.On("DispatchTable", mock.Anything, "cf-1", mock.Anything, mock.Anything, true, defaultEpoch). Return(false, nil) checkpointTs, resolvedTs, err := dispatcher.Tick(ctx, 1300, []model.TableID{1, 2, 3, 4, 5, 6}, mockCaptureInfos) require.NoError(t, err) @@ -523,7 +541,7 @@ func TestRebalance(t *testing.T) { communicator.AssertNumberOfCalls(t, "DispatchTable", 1) communicator.Reset() - communicator.On("DispatchTable", mock.Anything, "cf-1", mock.Anything, mock.Anything, true). + communicator.On("DispatchTable", mock.Anything, "cf-1", mock.Anything, mock.Anything, true, defaultEpoch). Return(true, nil) checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1300, []model.TableID{1, 2, 3, 4, 5, 6}, mockCaptureInfos) require.NoError(t, err) @@ -633,11 +651,13 @@ func TestIgnoreUnsyncedCaptures(t *testing.T) { SyncStatus: captureSyncFinished, CheckpointTs: 1300, ResolvedTs: 1600, + Epoch: defaultEpoch, }, "capture-2": { SyncStatus: captureSyncSent, // not synced CheckpointTs: 1400, ResolvedTs: 1500, + Epoch: "garbage", }, } @@ -656,7 +676,7 @@ func TestIgnoreUnsyncedCaptures(t *testing.T) { require.Equal(t, CheckpointCannotProceed, resolvedTs) communicator.Reset() - dispatcher.OnAgentSyncTaskStatuses("capture-2", []model.TableID{2, 4, 6}, []model.TableID{}, []model.TableID{}) + dispatcher.OnAgentSyncTaskStatuses("capture-2", defaultEpoch, []model.TableID{2, 4, 6}, []model.TableID{}, []model.TableID{}) checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1300, []model.TableID{1, 2, 3, 4, 5, 6}, defaultMockCaptureInfos) require.NoError(t, err) require.Equal(t, model.Ts(1300), checkpointTs) @@ -675,11 +695,13 @@ func TestRebalanceWhileAddingTable(t *testing.T) { SyncStatus: captureSyncFinished, CheckpointTs: 1300, ResolvedTs: 1600, + Epoch: defaultEpoch, }, "capture-2": { SyncStatus: captureSyncFinished, CheckpointTs: 1500, ResolvedTs: 1550, + Epoch: defaultEpoch, }, } for i := 1; i <= 6; i++ { @@ -690,7 +712,7 @@ func TestRebalanceWhileAddingTable(t *testing.T) { }) } - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(7), "capture-2", false). + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(7), "capture-2", false, defaultEpoch). Return(true, nil) checkpointTs, resolvedTs, err := dispatcher.Tick(ctx, 1300, []model.TableID{1, 2, 3, 4, 5, 6, 7}, defaultMockCaptureInfos) require.NoError(t, err) @@ -706,9 +728,9 @@ func TestRebalanceWhileAddingTable(t *testing.T) { require.Equal(t, CheckpointCannotProceed, resolvedTs) communicator.AssertExpectations(t) - dispatcher.OnAgentFinishedTableOperation("capture-2", model.TableID(7)) + dispatcher.OnAgentFinishedTableOperation("capture-2", model.TableID(7), defaultEpoch) communicator.Reset() - communicator.On("DispatchTable", mock.Anything, "cf-1", mock.Anything, mock.Anything, true). + communicator.On("DispatchTable", mock.Anything, "cf-1", mock.Anything, mock.Anything, true, defaultEpoch). Return(true, nil) checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1300, []model.TableID{1, 2, 3, 4, 5, 6, 7}, defaultMockCaptureInfos) require.NoError(t, err) @@ -729,11 +751,13 @@ func TestManualMoveTableWhileAddingTable(t *testing.T) { SyncStatus: captureSyncFinished, CheckpointTs: 1300, ResolvedTs: 1600, + Epoch: defaultEpoch, }, "capture-2": { SyncStatus: captureSyncFinished, CheckpointTs: 1500, ResolvedTs: 1550, + Epoch: defaultEpoch, }, } dispatcher.tables.AddTableRecord(&util.TableRecord{ @@ -747,7 +771,7 @@ func TestManualMoveTableWhileAddingTable(t *testing.T) { Status: util.RunningTable, }) - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(1), "capture-2", false). + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(1), "capture-2", false, defaultEpoch). Return(true, nil) checkpointTs, resolvedTs, err := dispatcher.Tick(ctx, 1300, []model.TableID{1, 2, 3}, defaultMockCaptureInfos) require.NoError(t, err) @@ -761,9 +785,9 @@ func TestManualMoveTableWhileAddingTable(t *testing.T) { require.Equal(t, CheckpointCannotProceed, resolvedTs) communicator.AssertExpectations(t) - dispatcher.OnAgentFinishedTableOperation("capture-2", 1) + dispatcher.OnAgentFinishedTableOperation("capture-2", 1, defaultEpoch) communicator.Reset() - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(1), "capture-2", true). + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(1), "capture-2", true, defaultEpoch). Return(true, nil) checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1300, []model.TableID{1, 2, 3}, defaultMockCaptureInfos) require.NoError(t, err) @@ -771,9 +795,9 @@ func TestManualMoveTableWhileAddingTable(t *testing.T) { require.Equal(t, CheckpointCannotProceed, resolvedTs) communicator.AssertExpectations(t) - dispatcher.OnAgentFinishedTableOperation("capture-2", 1) + dispatcher.OnAgentFinishedTableOperation("capture-2", 1, defaultEpoch) communicator.Reset() - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(1), "capture-1", false). + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(1), "capture-1", false, defaultEpoch). Return(true, nil) checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1300, []model.TableID{1, 2, 3}, defaultMockCaptureInfos) require.NoError(t, err) @@ -817,15 +841,15 @@ func TestAutoRebalanceOnCaptureOnline(t *testing.T) { require.Equal(t, CheckpointCannotProceed, resolvedTs) communicator.AssertExpectations(t) - dispatcher.OnAgentSyncTaskStatuses("capture-1", []model.TableID{}, []model.TableID{}, []model.TableID{}) - dispatcher.OnAgentSyncTaskStatuses("capture-2", []model.TableID{}, []model.TableID{}, []model.TableID{}) + dispatcher.OnAgentSyncTaskStatuses("capture-1", defaultEpoch, []model.TableID{}, []model.TableID{}, []model.TableID{}) + dispatcher.OnAgentSyncTaskStatuses("capture-2", defaultEpoch, []model.TableID{}, []model.TableID{}, []model.TableID{}) communicator.Reset() - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(1), mock.Anything, false). + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(1), mock.Anything, false, defaultEpoch). Return(true, nil) - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(2), mock.Anything, false). + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(2), mock.Anything, false, defaultEpoch). Return(true, nil) - communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(3), mock.Anything, false). + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(3), mock.Anything, false, defaultEpoch). Return(true, nil) checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1000, []model.TableID{1, 2, 3}, captureList) require.NoError(t, err) @@ -861,7 +885,7 @@ func TestAutoRebalanceOnCaptureOnline(t *testing.T) { communicator.AssertExpectations(t) communicator.ExpectedCalls = nil - dispatcher.OnAgentSyncTaskStatuses("capture-3", []model.TableID{}, []model.TableID{}, []model.TableID{}) + dispatcher.OnAgentSyncTaskStatuses("capture-3", defaultEpoch, []model.TableID{}, []model.TableID{}, []model.TableID{}) checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1000, []model.TableID{1, 2, 3}, captureList) require.NoError(t, err) require.Equal(t, CheckpointCannotProceed, checkpointTs) @@ -870,13 +894,13 @@ func TestAutoRebalanceOnCaptureOnline(t *testing.T) { for captureID, tables := range communicator.addTableRecords { for _, tableID := range tables { - dispatcher.OnAgentFinishedTableOperation(captureID, tableID) + dispatcher.OnAgentFinishedTableOperation(captureID, tableID, defaultEpoch) } } communicator.Reset() var removeTableFromCapture model.CaptureID - communicator.On("DispatchTable", mock.Anything, "cf-1", mock.Anything, mock.Anything, true). + communicator.On("DispatchTable", mock.Anything, "cf-1", mock.Anything, mock.Anything, true, defaultEpoch). Return(true, nil).Run(func(args mock.Arguments) { removeTableFromCapture = args.Get(3).(model.CaptureID) }) @@ -888,11 +912,11 @@ func TestAutoRebalanceOnCaptureOnline(t *testing.T) { removedTableID := communicator.removeTableRecords[removeTableFromCapture][0] - dispatcher.OnAgentFinishedTableOperation(removeTableFromCapture, removedTableID) + dispatcher.OnAgentFinishedTableOperation(removeTableFromCapture, removedTableID, defaultEpoch) dispatcher.OnAgentCheckpoint("capture-1", 1100, 1400) dispatcher.OnAgentCheckpoint("capture-2", 1200, 1300) communicator.ExpectedCalls = nil - communicator.On("DispatchTable", mock.Anything, "cf-1", removedTableID, "capture-3", false). + communicator.On("DispatchTable", mock.Anything, "cf-1", removedTableID, "capture-3", false, defaultEpoch). Return(true, nil) checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1000, []model.TableID{1, 2, 3}, captureList) require.NoError(t, err) @@ -901,6 +925,80 @@ func TestAutoRebalanceOnCaptureOnline(t *testing.T) { communicator.AssertExpectations(t) } +func TestInvalidFinishedTableOperation(t *testing.T) { + t.Parallel() + + ctx := cdcContext.NewBackendContext4Test(false) + communicator := NewMockScheduleDispatcherCommunicator() + dispatcher := NewBaseScheduleDispatcher("cf-1", communicator, 1000) + dispatcher.captureStatus = map[model.CaptureID]*captureStatus{ + "capture-1": { + SyncStatus: captureSyncFinished, + CheckpointTs: 1300, + ResolvedTs: 1600, + Epoch: defaultEpoch, + }, + "capture-2": { + SyncStatus: captureSyncFinished, + CheckpointTs: 1500, + ResolvedTs: 1550, + Epoch: defaultEpoch, + }, + } + dispatcher.tables.AddTableRecord(&util.TableRecord{ + TableID: 2, + CaptureID: "capture-1", + Status: util.RunningTable, + }) + dispatcher.tables.AddTableRecord(&util.TableRecord{ + TableID: 3, + CaptureID: "capture-1", + Status: util.RunningTable, + }) + + communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(1), "capture-2", false, defaultEpoch). + Return(true, nil) + checkpointTs, resolvedTs, err := dispatcher.Tick(ctx, 1300, []model.TableID{1, 2, 3}, defaultMockCaptureInfos) + require.NoError(t, err) + require.Equal(t, CheckpointCannotProceed, checkpointTs) + require.Equal(t, CheckpointCannotProceed, resolvedTs) + + // Invalid epoch + dispatcher.OnAgentFinishedTableOperation("capture-2", model.TableID(1), "invalid-epoch") + checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1300, []model.TableID{1, 2, 3}, defaultMockCaptureInfos) + require.NoError(t, err) + require.Equal(t, CheckpointCannotProceed, checkpointTs) + require.Equal(t, CheckpointCannotProceed, resolvedTs) + record, ok := dispatcher.tables.GetTableRecord(model.TableID(1)) + require.True(t, ok) + require.Equal(t, record.Status, util.AddingTable) + + // Invalid capture + dispatcher.OnAgentFinishedTableOperation("capture-invalid", model.TableID(1), defaultEpoch) + checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1300, []model.TableID{1, 2, 3}, defaultMockCaptureInfos) + require.NoError(t, err) + require.Equal(t, CheckpointCannotProceed, checkpointTs) + require.Equal(t, CheckpointCannotProceed, resolvedTs) + record, ok = dispatcher.tables.GetTableRecord(model.TableID(1)) + require.True(t, ok) + require.Equal(t, record.Status, util.AddingTable) + + // Invalid table + dispatcher.OnAgentFinishedTableOperation("capture-1", model.TableID(999), defaultEpoch) + checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1300, []model.TableID{1, 2, 3}, defaultMockCaptureInfos) + require.NoError(t, err) + require.Equal(t, CheckpointCannotProceed, checkpointTs) + require.Equal(t, CheckpointCannotProceed, resolvedTs) + record, ok = dispatcher.tables.GetTableRecord(model.TableID(1)) + require.True(t, ok) + require.Equal(t, record.Status, util.AddingTable) + + // Capture not matching + require.Panics(t, func() { + dispatcher.OnAgentFinishedTableOperation("capture-1", model.TableID(1), defaultEpoch) + }) +} + func BenchmarkAddTable(b *testing.B) { ctx := cdcContext.NewBackendContext4Test(false) diff --git a/errors.toml b/errors.toml index 53da43512cf..9d7ba10d022 100755 --- a/errors.toml +++ b/errors.toml @@ -663,7 +663,7 @@ peer-to-peer message client has failed permanently, no need to reconnect: %s ["CDC:ErrPeerMessageDataLost"] error = ''' -peer-to-peer message data lost, topic: %s, seq: %s +peer-to-peer message data lost, topic: %s, seq: %d ''' ["CDC:ErrPeerMessageDecodeError"] diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 4a37306b2c3..41595b7e3fd 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -281,7 +281,7 @@ var ( ErrPeerMessageStaleConnection = errors.Normalize("peer-to-peer message stale connection: old-epoch %d, new-epoch %d", errors.RFCCodeText("CDC:ErrPeerMessageStaleConnection")) ErrPeerMessageDuplicateConnection = errors.Normalize("peer-to-peer message duplicate connection: epoch %d", errors.RFCCodeText("CDC:ErrPeerMessageDuplicateConnection")) ErrPeerMessageServerClosed = errors.Normalize("peer-to-peer message server has closed connection: %s.", errors.RFCCodeText("CDC:ErrPeerMessageServerClosed")) - ErrPeerMessageDataLost = errors.Normalize("peer-to-peer message data lost, topic: %s, seq: %s", errors.RFCCodeText("CDC:ErrPeerMessageDataLost")) + ErrPeerMessageDataLost = errors.Normalize("peer-to-peer message data lost, topic: %s, seq: %d", errors.RFCCodeText("CDC:ErrPeerMessageDataLost")) ErrPeerMessageToManyPeers = errors.Normalize("peer-to-peer message server got too many peers: %d peers", errors.RFCCodeText("CDC:ErrPeerMessageToManyPeers")) ErrPeerMessageDecodeError = errors.Normalize("failed to decode peer-to-peer message", errors.RFCCodeText("CDC:ErrPeerMessageDecodeError")) ErrPeerMessageTaskQueueCongested = errors.Normalize("peer-to-peer message server has too many pending tasks", errors.RFCCodeText("CDC:ErrPeerMessageTaskQueueCongested"))