diff --git a/maintainer/barrier_event.go b/maintainer/barrier_event.go index 2700066de..796b0fe93 100644 --- a/maintainer/barrier_event.go +++ b/maintainer/barrier_event.go @@ -38,9 +38,10 @@ type BarrierEvent struct { blockedDispatchers *heartbeatpb.InfluencedTables dropDispatchers *heartbeatpb.InfluencedTables - blockedTasks []*scheduler.StateMachine[common.DispatcherID] - dropTasks []*scheduler.StateMachine[common.DispatcherID] - newTables []*heartbeatpb.Table + blockedTasks []*scheduler.StateMachine[common.DispatcherID] + dropTasks []*scheduler.StateMachine[common.DispatcherID] + newTables []*heartbeatpb.Table + schemaIDChange []*heartbeatpb.SchemaIDChange advancedDispatchers map[common.DispatcherID]bool lastResendTime time.Time @@ -56,6 +57,7 @@ func NewBlockEvent(cfID string, scheduler *Scheduler, blockedDispatchers: status.BlockTables, newTables: status.NeedAddedTables, dropDispatchers: status.NeedDroppedTables, + schemaIDChange: status.UpdatedSchemas, advancedDispatchers: make(map[common.DispatcherID]bool), lastResendTime: time.Time{}, } @@ -101,6 +103,15 @@ func (b *BarrierEvent) scheduleBlockEvent() { TableID: add.TableID, }, b.commitTs) } + + for _, change := range b.schemaIDChange { + log.Info("update schema id", + zap.String("changefeed", b.cfID), + zap.Int64("newSchema", change.OldSchemaID), + zap.Int64("oldSchema", change.NewSchemaID), + zap.Int64("table", change.TableID)) + b.scheduler.UpdateSchemaID(change.TableID, change.NewSchemaID) + } } func (b *BarrierEvent) allDispatcherReported() bool { diff --git a/maintainer/barrier_event_test.go b/maintainer/barrier_event_test.go index 61dfa12f7..ccd8d9b91 100644 --- a/maintainer/barrier_event_test.go +++ b/maintainer/barrier_event_test.go @@ -160,3 +160,31 @@ func TestResendAction(t *testing.T) { require.Equal(t, resp.DispatcherStatuses[0].Action.Action, heartbeatpb.Action_Pass) require.Equal(t, resp.DispatcherStatuses[0].Action.CommitTs, uint64(10)) } + +func TestUpdateSchemaID(t *testing.T) { + sche := NewScheduler("test", 1, nil, nil, nil, 1000, 0) + sche.AddNewNode("node1") + sche.AddNewTable(common.Table{1, 1}, 1) + require.Len(t, sche.Absent(), 1) + require.Len(t, sche.GetTasksBySchemaID(1), 1) + event := NewBlockEvent("test", sche, &heartbeatpb.State{ + IsBlocked: true, + BlockTs: 10, + BlockTables: &heartbeatpb.InfluencedTables{ + InfluenceType: heartbeatpb.InfluenceType_All, + }, + UpdatedSchemas: []*heartbeatpb.SchemaIDChange{ + { + TableID: 1, + OldSchemaID: 1, + NewSchemaID: 2, + }, + }}, + ) + event.scheduleBlockEvent() + require.Len(t, sche.Absent(), 1) + // check the schema id and map is updated + require.Len(t, sche.GetTasksBySchemaID(1), 0) + require.Len(t, sche.GetTasksBySchemaID(2), 1) + require.Equal(t, sche.GetTasksByTableIDs(1)[0].Inferior.(*ReplicaSet).SchemaID, int64(2)) +} diff --git a/maintainer/barrier_test.go b/maintainer/barrier_test.go index f55502478..a34f1e088 100644 --- a/maintainer/barrier_test.go +++ b/maintainer/barrier_test.go @@ -24,8 +24,6 @@ import ( "github.com/flowbehappy/tigate/pkg/messaging" "github.com/flowbehappy/tigate/scheduler" "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/require" "go.uber.org/zap" ) @@ -36,22 +34,15 @@ func TestNormalBlock(t *testing.T) { sche.AddNewNode("node2") var blockedDispatcherIDS []*heartbeatpb.DispatcherID for id := 1; id < 4; id++ { - span := spanz.TableIDToComparableSpan(int64(id)) - tableSpan := &heartbeatpb.TableSpan{ - TableID: int64(id), - StartKey: span.StartKey, - EndKey: span.EndKey, - } - dispatcherID := common.NewDispatcherID() - blockedDispatcherIDS = append(blockedDispatcherIDS, dispatcherID.ToPB()) - replicaSet := NewReplicaSet(model.DefaultChangeFeedID("test"), dispatcherID, 1, tableSpan, 0) - stm := scheduler.NewStateMachine(dispatcherID, nil, replicaSet) - stm.State = scheduler.SchedulerStatusWorking - sche.Working()[dispatcherID] = stm + sche.AddNewTable(common.Table{1, int64(id)}, 0) + stm := sche.GetTasksByTableIDs(int64(id))[0] + blockedDispatcherIDS = append(blockedDispatcherIDS, stm.ID.ToPB()) stm.Primary = "node1" - sche.nodeTasks["node1"][dispatcherID] = stm + stm.State = scheduler.SchedulerStatusWorking + sche.tryMoveTask(stm.ID, stm, scheduler.SchedulerStatusAbsent, "", true) } + // the last one is the writer var selectDispatcherID = common.NewDispatcherIDFromPB(blockedDispatcherIDS[2]) sche.nodeTasks["node2"][selectDispatcherID] = sche.nodeTasks["node1"][selectDispatcherID] dropID := sche.nodeTasks["node2"][selectDispatcherID].Inferior.(*ReplicaSet).Span.TableID diff --git a/maintainer/scheduler.go b/maintainer/scheduler.go index ba20df08e..d0c1d1805 100644 --- a/maintainer/scheduler.go +++ b/maintainer/scheduler.go @@ -45,7 +45,7 @@ type Scheduler struct { // group the tasks by schema id schemaTasks map[int64]map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID] // tables - tableTasks map[int64]struct{} + tableTasks map[int64]map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID] // totalMaps holds all state maps, absent, committing, working and removing totalMaps []map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID] bootstrapped bool @@ -71,7 +71,7 @@ func NewScheduler(changefeedID string, s := &Scheduler{ nodeTasks: make(map[node.ID]map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]), schemaTasks: make(map[int64]map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]), - tableTasks: make(map[int64]struct{}), + tableTasks: make(map[int64]map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]), startCheckpointTs: checkpointTs, changefeedID: changefeedID, bootstrapped: false, @@ -106,8 +106,8 @@ func (s *Scheduler) GetAllNodes() []node.ID { } func (s *Scheduler) AddNewTable(table common.Table, startTs uint64) { - _, ok := s.tableTasks[table.TableID] - if ok { + tables, ok := s.tableTasks[table.TableID] + if ok && len(tables) > 0 { log.Warn("table already add, ignore", zap.String("changefeed", s.changefeedID), zap.Int64("schema", table.SchemaID), @@ -215,23 +215,43 @@ func (s *Scheduler) RemoveTask(stm *scheduler.StateMachine[common.DispatcherID]) } func (s *Scheduler) GetTasksByTableIDs(tableIDs ...int64) []*scheduler.StateMachine[common.DispatcherID] { - tableMap := make(map[int64]bool, len(tableIDs)) - for _, tableID := range tableIDs { - tableMap[tableID] = true - } var stms []*scheduler.StateMachine[common.DispatcherID] - for _, m := range s.totalMaps { - for _, stm := range m { - replica := stm.Inferior.(*ReplicaSet) - if !tableMap[replica.Span.TableID] { - continue - } + for _, tableID := range tableIDs { + for _, stm := range s.tableTasks[tableID] { stms = append(stms, stm) } } return stms } +// UpdateSchemaID will update the schema id of the table, and move the task to the new schema map +// it called when rename a table to another schema +func (s *Scheduler) UpdateSchemaID(tableID, newSchemaID int64) { + for _, stm := range s.tableTasks[tableID] { + replicaSet := stm.Inferior.(*ReplicaSet) + oldSchemaID := replicaSet.SchemaID + // update schemaID + replicaSet.SchemaID = newSchemaID + + //update schema map + schemaMap, ok := s.schemaTasks[oldSchemaID] + if ok { + delete(schemaMap, stm.ID) + //clear the map if empty + if len(schemaMap) == 0 { + delete(s.schemaTasks, oldSchemaID) + } + } + // add it to new schema map + newMap, ok := s.schemaTasks[newSchemaID] + if !ok { + newMap = make(map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]) + s.schemaTasks[newSchemaID] = newMap + } + newMap[stm.ID] = stm + } +} + func (s *Scheduler) AddNewNode(id node.ID) { _, ok := s.nodeTasks[id] if ok { @@ -550,13 +570,21 @@ func (s *Scheduler) addNewSpans(schemaID, tableID int64, dispatcherID, schemaID, newTableSpan, startTs).(*ReplicaSet) stm := scheduler.NewStateMachine(dispatcherID, nil, replicaSet) s.Absent()[dispatcherID] = stm + // modify the schema map schemaMap, ok := s.schemaTasks[schemaID] if !ok { schemaMap = make(map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]) s.schemaTasks[schemaID] = schemaMap } schemaMap[dispatcherID] = stm - s.tableTasks[tableID] = struct{}{} + + // modify the table map + tableMap, ok := s.tableTasks[tableID] + if !ok { + tableMap = make(map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]) + s.tableTasks[tableID] = tableMap + } + tableMap[dispatcherID] = stm } } @@ -578,7 +606,13 @@ func (s *Scheduler) tryMoveTask(dispatcherID common.DispatcherID, delete(m, dispatcherID) } delete(s.schemaTasks[stm.Inferior.(*ReplicaSet).SchemaID], dispatcherID) - delete(s.tableTasks, stm.Inferior.(*ReplicaSet).Span.TableID) + if len(s.schemaTasks[stm.Inferior.(*ReplicaSet).SchemaID]) == 0 { + delete(s.schemaTasks, stm.Inferior.(*ReplicaSet).SchemaID) + } + delete(s.tableTasks[stm.Inferior.(*ReplicaSet).Span.TableID], dispatcherID) + if len(s.tableTasks[stm.Inferior.(*ReplicaSet).Span.TableID]) == 0 { + delete(s.tableTasks, stm.Inferior.(*ReplicaSet).Span.TableID) + } } // keep node task map is updated if modifyNodeMap && oldPrimary != stm.Primary {