Skip to content

Commit

Permalink
support update table schema id
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy committed Oct 9, 2024
1 parent 45240db commit 3557c88
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 34 deletions.
17 changes: 14 additions & 3 deletions maintainer/barrier_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ type BarrierEvent struct {
blockedDispatchers *heartbeatpb.InfluencedTables
dropDispatchers *heartbeatpb.InfluencedTables

blockedTasks []*scheduler.StateMachine[common.DispatcherID]
dropTasks []*scheduler.StateMachine[common.DispatcherID]
newTables []*heartbeatpb.Table
blockedTasks []*scheduler.StateMachine[common.DispatcherID]
dropTasks []*scheduler.StateMachine[common.DispatcherID]
newTables []*heartbeatpb.Table
schemaIDChange []*heartbeatpb.SchemaIDChange

advancedDispatchers map[common.DispatcherID]bool
lastResendTime time.Time
Expand All @@ -56,6 +57,7 @@ func NewBlockEvent(cfID string, scheduler *Scheduler,
blockedDispatchers: status.BlockTables,
newTables: status.NeedAddedTables,
dropDispatchers: status.NeedDroppedTables,
schemaIDChange: status.UpdatedSchemas,
advancedDispatchers: make(map[common.DispatcherID]bool),
lastResendTime: time.Time{},
}
Expand Down Expand Up @@ -101,6 +103,15 @@ func (b *BarrierEvent) scheduleBlockEvent() {
TableID: add.TableID,
}, b.commitTs)
}

for _, change := range b.schemaIDChange {
log.Info("update schema id",
zap.String("changefeed", b.cfID),
zap.Int64("newSchema", change.OldSchemaID),
zap.Int64("oldSchema", change.NewSchemaID),
zap.Int64("table", change.TableID))
b.scheduler.UpdateSchemaID(change.TableID, change.NewSchemaID)
}
}

func (b *BarrierEvent) allDispatcherReported() bool {
Expand Down
28 changes: 28 additions & 0 deletions maintainer/barrier_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,31 @@ func TestResendAction(t *testing.T) {
require.Equal(t, resp.DispatcherStatuses[0].Action.Action, heartbeatpb.Action_Pass)
require.Equal(t, resp.DispatcherStatuses[0].Action.CommitTs, uint64(10))
}

func TestUpdateSchemaID(t *testing.T) {
sche := NewScheduler("test", 1, nil, nil, nil, 1000, 0)
sche.AddNewNode("node1")
sche.AddNewTable(common.Table{1, 1}, 1)
require.Len(t, sche.Absent(), 1)
require.Len(t, sche.GetTasksBySchemaID(1), 1)
event := NewBlockEvent("test", sche, &heartbeatpb.State{
IsBlocked: true,
BlockTs: 10,
BlockTables: &heartbeatpb.InfluencedTables{
InfluenceType: heartbeatpb.InfluenceType_All,
},
UpdatedSchemas: []*heartbeatpb.SchemaIDChange{
{
TableID: 1,
OldSchemaID: 1,
NewSchemaID: 2,
},
}},
)
event.scheduleBlockEvent()
require.Len(t, sche.Absent(), 1)
// check the schema id and map is updated
require.Len(t, sche.GetTasksBySchemaID(1), 0)
require.Len(t, sche.GetTasksBySchemaID(2), 1)
require.Equal(t, sche.GetTasksByTableIDs(1)[0].Inferior.(*ReplicaSet).SchemaID, int64(2))
}
21 changes: 6 additions & 15 deletions maintainer/barrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"github.com/flowbehappy/tigate/pkg/messaging"
"github.com/flowbehappy/tigate/scheduler"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
Expand All @@ -36,22 +34,15 @@ func TestNormalBlock(t *testing.T) {
sche.AddNewNode("node2")
var blockedDispatcherIDS []*heartbeatpb.DispatcherID
for id := 1; id < 4; id++ {
span := spanz.TableIDToComparableSpan(int64(id))
tableSpan := &heartbeatpb.TableSpan{
TableID: int64(id),
StartKey: span.StartKey,
EndKey: span.EndKey,
}
dispatcherID := common.NewDispatcherID()
blockedDispatcherIDS = append(blockedDispatcherIDS, dispatcherID.ToPB())
replicaSet := NewReplicaSet(model.DefaultChangeFeedID("test"), dispatcherID, 1, tableSpan, 0)
stm := scheduler.NewStateMachine(dispatcherID, nil, replicaSet)
stm.State = scheduler.SchedulerStatusWorking
sche.Working()[dispatcherID] = stm
sche.AddNewTable(common.Table{1, int64(id)}, 0)
stm := sche.GetTasksByTableIDs(int64(id))[0]
blockedDispatcherIDS = append(blockedDispatcherIDS, stm.ID.ToPB())
stm.Primary = "node1"
sche.nodeTasks["node1"][dispatcherID] = stm
stm.State = scheduler.SchedulerStatusWorking
sche.tryMoveTask(stm.ID, stm, scheduler.SchedulerStatusAbsent, "", true)
}

// the last one is the writer
var selectDispatcherID = common.NewDispatcherIDFromPB(blockedDispatcherIDS[2])
sche.nodeTasks["node2"][selectDispatcherID] = sche.nodeTasks["node1"][selectDispatcherID]
dropID := sche.nodeTasks["node2"][selectDispatcherID].Inferior.(*ReplicaSet).Span.TableID
Expand Down
66 changes: 50 additions & 16 deletions maintainer/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Scheduler struct {
// group the tasks by schema id
schemaTasks map[int64]map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]
// tables
tableTasks map[int64]struct{}
tableTasks map[int64]map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]
// totalMaps holds all state maps, absent, committing, working and removing
totalMaps []map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]
bootstrapped bool
Expand All @@ -71,7 +71,7 @@ func NewScheduler(changefeedID string,
s := &Scheduler{
nodeTasks: make(map[node.ID]map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]),
schemaTasks: make(map[int64]map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]),
tableTasks: make(map[int64]struct{}),
tableTasks: make(map[int64]map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID]),
startCheckpointTs: checkpointTs,
changefeedID: changefeedID,
bootstrapped: false,
Expand Down Expand Up @@ -106,8 +106,8 @@ func (s *Scheduler) GetAllNodes() []node.ID {
}

func (s *Scheduler) AddNewTable(table common.Table, startTs uint64) {
_, ok := s.tableTasks[table.TableID]
if ok {
tables, ok := s.tableTasks[table.TableID]
if ok && len(tables) > 0 {
log.Warn("table already add, ignore",
zap.String("changefeed", s.changefeedID),
zap.Int64("schema", table.SchemaID),
Expand Down Expand Up @@ -215,23 +215,43 @@ func (s *Scheduler) RemoveTask(stm *scheduler.StateMachine[common.DispatcherID])
}

func (s *Scheduler) GetTasksByTableIDs(tableIDs ...int64) []*scheduler.StateMachine[common.DispatcherID] {
tableMap := make(map[int64]bool, len(tableIDs))
for _, tableID := range tableIDs {
tableMap[tableID] = true
}
var stms []*scheduler.StateMachine[common.DispatcherID]
for _, m := range s.totalMaps {
for _, stm := range m {
replica := stm.Inferior.(*ReplicaSet)
if !tableMap[replica.Span.TableID] {
continue
}
for _, tableID := range tableIDs {
for _, stm := range s.tableTasks[tableID] {
stms = append(stms, stm)
}
}
return stms
}

// UpdateSchemaID will update the schema id of the table, and move the task to the new schema map
// it called when rename a table to another schema
func (s *Scheduler) UpdateSchemaID(tableID, newSchemaID int64) {
for _, stm := range s.tableTasks[tableID] {
replicaSet := stm.Inferior.(*ReplicaSet)
oldSchemaID := replicaSet.SchemaID
// update schemaID
replicaSet.SchemaID = newSchemaID

//update schema map
schemaMap, ok := s.schemaTasks[oldSchemaID]
if ok {
delete(schemaMap, stm.ID)
//clear the map if empty
if len(schemaMap) == 0 {
delete(s.schemaTasks, oldSchemaID)
}
}
// add it to new schema map
newMap, ok := s.schemaTasks[newSchemaID]
if !ok {
newMap = make(map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID])
s.schemaTasks[newSchemaID] = newMap
}
newMap[stm.ID] = stm
}
}

func (s *Scheduler) AddNewNode(id node.ID) {
_, ok := s.nodeTasks[id]
if ok {
Expand Down Expand Up @@ -550,13 +570,21 @@ func (s *Scheduler) addNewSpans(schemaID, tableID int64,
dispatcherID, schemaID, newTableSpan, startTs).(*ReplicaSet)
stm := scheduler.NewStateMachine(dispatcherID, nil, replicaSet)
s.Absent()[dispatcherID] = stm
// modify the schema map
schemaMap, ok := s.schemaTasks[schemaID]
if !ok {
schemaMap = make(map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID])
s.schemaTasks[schemaID] = schemaMap
}
schemaMap[dispatcherID] = stm
s.tableTasks[tableID] = struct{}{}

// modify the table map
tableMap, ok := s.tableTasks[tableID]
if !ok {
tableMap = make(map[common.DispatcherID]*scheduler.StateMachine[common.DispatcherID])
s.tableTasks[tableID] = tableMap
}
tableMap[dispatcherID] = stm
}
}

Expand All @@ -578,7 +606,13 @@ func (s *Scheduler) tryMoveTask(dispatcherID common.DispatcherID,
delete(m, dispatcherID)
}
delete(s.schemaTasks[stm.Inferior.(*ReplicaSet).SchemaID], dispatcherID)
delete(s.tableTasks, stm.Inferior.(*ReplicaSet).Span.TableID)
if len(s.schemaTasks[stm.Inferior.(*ReplicaSet).SchemaID]) == 0 {
delete(s.schemaTasks, stm.Inferior.(*ReplicaSet).SchemaID)
}
delete(s.tableTasks[stm.Inferior.(*ReplicaSet).Span.TableID], dispatcherID)
if len(s.tableTasks[stm.Inferior.(*ReplicaSet).Span.TableID]) == 0 {
delete(s.tableTasks, stm.Inferior.(*ReplicaSet).Span.TableID)
}
}
// keep node task map is updated
if modifyNodeMap && oldPrimary != stm.Primary {
Expand Down

0 comments on commit 3557c88

Please sign in to comment.