diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index 4780e45120d..f6e62e1bc37 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -243,7 +243,7 @@ func (m *managerImpl) handleCommand(ctx cdcContext.Context) error { case commandTpQueryTableCount: count := 0 for _, p := range m.processors { - count += len(p.GetAllCurrentTables()) + count += p.GetAllCurrentTables() } select { case cmd.payload.(chan int) <- count: diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index be964ca54d6..47e503808fc 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -118,13 +118,15 @@ func (p *processor) checkReadyForMessages() bool { return p.changefeed != nil && p.changefeed.Status != nil } +var _ scheduler.TableExecutor = (*processor)(nil) + // AddTable implements TableExecutor interface. // AddTable may cause by the following scenario // 1. `Create Table`, a new table dispatched to the processor, `isPrepare` should be false // 2. Prepare phase for 2 phase scheduling, `isPrepare` should be true. // 3. Replicating phase for 2 phase scheduling, `isPrepare` should be false func (p *processor) AddTable( - ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool, + ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool, ) (bool, error) { if !p.checkReadyForMessages() { return false, nil @@ -135,7 +137,7 @@ func (p *processor) AddTable( zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("checkpointTs", startTs), zap.Bool("isPrepare", isPrepare)) } @@ -143,9 +145,9 @@ func (p *processor) AddTable( var alreadyExist bool var state tablepb.TableState if p.pullBasedSinking { - state, alreadyExist = p.sinkManager.GetTableState(tableID) + state, alreadyExist = p.sinkManager.GetTableState(span.TableID) } else { - table, ok := p.tables[tableID] + table, ok := p.tables[span.TableID] if ok { alreadyExist = true state = table.State() @@ -161,7 +163,7 @@ func (p *processor) AddTable( zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("checkpointTs", startTs), zap.Bool("isPrepare", isPrepare)) return true, nil @@ -170,11 +172,11 @@ func (p *processor) AddTable( // be stopped on original capture already, it's safe to start replicating data now. if !isPrepare { if p.pullBasedSinking { - if err := p.sinkManager.StartTable(tableID, startTs); err != nil { + if err := p.sinkManager.StartTable(span.TableID, startTs); err != nil { return false, errors.Trace(err) } } else { - p.tables[tableID].Start(startTs) + p.tables[span.TableID].Start(startTs) } } return true, nil @@ -183,7 +185,7 @@ func (p *processor) AddTable( zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("checkpointTs", startTs), zap.Bool("isPrepare", isPrepare)) return true, nil @@ -192,10 +194,10 @@ func (p *processor) AddTable( zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("checkpointTs", startTs), zap.Bool("isPrepare", isPrepare)) - p.removeTable(p.tables[tableID], tableID) + p.removeTable(p.tables[span.TableID], span.TableID) } } @@ -208,36 +210,37 @@ func (p *processor) AddTable( zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("checkpointTs", startTs), zap.Bool("isPrepare", isPrepare)) } if p.pullBasedSinking { - p.sourceManager.AddTable(ctx.(cdcContext.Context), tableID, p.getTableName(ctx, tableID), startTs) + p.sourceManager.AddTable( + ctx.(cdcContext.Context), span.TableID, p.getTableName(ctx, span.TableID), startTs) if p.redoManager.Enabled() { - p.redoManager.AddTable(tableID, startTs) + p.redoManager.AddTable(span.TableID, startTs) } - p.sinkManager.AddTable(tableID, startTs, p.changefeed.Info.TargetTs) + p.sinkManager.AddTable(span.TableID, startTs, p.changefeed.Info.TargetTs) if !isPrepare { - if err := p.sinkManager.StartTable(tableID, startTs); err != nil { + if err := p.sinkManager.StartTable(span.TableID, startTs); err != nil { return false, errors.Trace(err) } } } else { table, err := p.createTablePipeline( - ctx.(cdcContext.Context), tableID, &model.TableReplicaInfo{StartTs: startTs}) + ctx.(cdcContext.Context), span.TableID, &model.TableReplicaInfo{StartTs: startTs}) if err != nil { return false, errors.Trace(err) } - p.tables[tableID] = table + p.tables[span.TableID] = table if !isPrepare { table.Start(startTs) log.Debug("start table", zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("startTs", startTs)) } } @@ -246,31 +249,31 @@ func (p *processor) AddTable( } // RemoveTable implements TableExecutor interface. -func (p *processor) RemoveTable(tableID model.TableID) bool { +func (p *processor) RemoveTable(span tablepb.Span) bool { if !p.checkReadyForMessages() { return false } if p.pullBasedSinking { - _, exist := p.sinkManager.GetTableState(tableID) + _, exist := p.sinkManager.GetTableState(span.TableID) if !exist { log.Warn("Table which will be deleted is not found", zap.String("capture", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID)) + zap.Int64("tableID", span.TableID)) return true } - p.sinkManager.AsyncStopTable(tableID) + p.sinkManager.AsyncStopTable(span.TableID) return true } - table, ok := p.tables[tableID] + table, ok := p.tables[span.TableID] if !ok { log.Warn("table which will be deleted is not found", zap.String("capture", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID)) + zap.Int64("tableID", span.TableID)) return true } if !table.AsyncStop() { @@ -281,14 +284,14 @@ func (p *processor) RemoveTable(tableID model.TableID) bool { zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), zap.Uint64("checkpointTs", table.CheckpointTs()), - zap.Int64("tableID", tableID)) + zap.Int64("tableID", span.TableID)) return false } return true } // IsAddTableFinished implements TableExecutor interface. -func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bool { +func (p *processor) IsAddTableFinished(span tablepb.Span, isPrepare bool) bool { if !p.checkReadyForMessages() { return false } @@ -303,14 +306,14 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo done := func() bool { var alreadyExist bool if p.pullBasedSinking { - state, alreadyExist = p.sinkManager.GetTableState(tableID) + state, alreadyExist = p.sinkManager.GetTableState(span.TableID) if alreadyExist { - stats := p.sinkManager.GetTableStats(tableID) + stats := p.sinkManager.GetTableStats(span.TableID) tableResolvedTs = stats.ResolvedTs tableCheckpointTs = stats.CheckpointTs } } else { - table, ok := p.tables[tableID] + table, ok := p.tables[span.TableID] if ok { alreadyExist = true state = table.State() @@ -324,7 +327,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Bool("isPrepare", isPrepare)) } @@ -340,7 +343,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("tableResolvedTs", tableResolvedTs), zap.Uint64("localResolvedTs", localResolvedTs), zap.Uint64("globalResolvedTs", globalResolvedTs), @@ -356,7 +359,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("tableResolvedTs", tableResolvedTs), zap.Uint64("localResolvedTs", localResolvedTs), zap.Uint64("globalResolvedTs", globalResolvedTs), @@ -369,7 +372,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo } // IsRemoveTableFinished implements TableExecutor interface. -func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool) { +func (p *processor) IsRemoveTableFinished(span tablepb.Span) (model.Ts, bool) { if !p.checkReadyForMessages() { return 0, false } @@ -378,13 +381,13 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool var state tablepb.TableState var tableCheckpointTs uint64 if p.pullBasedSinking { - state, alreadyExist = p.sinkManager.GetTableState(tableID) + state, alreadyExist = p.sinkManager.GetTableState(span.TableID) if alreadyExist { - stats := p.sinkManager.GetTableStats(tableID) + stats := p.sinkManager.GetTableStats(span.TableID) tableCheckpointTs = stats.CheckpointTs } } else { - table, ok := p.tables[tableID] + table, ok := p.tables[span.TableID] if ok { alreadyExist = true state = table.State() @@ -397,7 +400,7 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID)) + zap.Int64("tableID", span.TableID)) return 0, true } @@ -407,53 +410,49 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), zap.Uint64("checkpointTs", tableCheckpointTs), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Any("tableStatus", state)) return 0, false } if p.pullBasedSinking { - stats := p.sinkManager.GetTableStats(tableID) - p.sourceManager.RemoveTable(tableID) - p.sinkManager.RemoveTable(tableID) + stats := p.sinkManager.GetTableStats(span.TableID) + p.sourceManager.RemoveTable(span.TableID) + p.sinkManager.RemoveTable(span.TableID) if p.redoManager.Enabled() { - p.redoManager.RemoveTable(tableID) + p.redoManager.RemoveTable(span.TableID) } log.Info("table removed", zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("checkpointTs", stats.CheckpointTs)) return stats.CheckpointTs, true } - table := p.tables[tableID] + table := p.tables[span.TableID] p.metricRemainKVEventGauge.Sub(float64(table.RemainEvents())) table.Cancel() table.Wait() - delete(p.tables, tableID) + delete(p.tables, span.TableID) checkpointTs := table.CheckpointTs() log.Info("table removed", zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("checkpointTs", checkpointTs)) return checkpointTs, true } // GetAllCurrentTables implements TableExecutor interface. -func (p *processor) GetAllCurrentTables() []model.TableID { +func (p *processor) GetAllCurrentTables() int { if p.pullBasedSinking { - return p.sinkManager.GetAllCurrentTableIDs() - } - ret := make([]model.TableID, 0, len(p.tables)) - for tableID := range p.tables { - ret = append(ret, tableID) + return len(p.sinkManager.GetAllCurrentTableIDs()) } - return ret + return len(p.tables) } // GetCheckpoint implements TableExecutor interface. @@ -462,35 +461,39 @@ func (p *processor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) { } // GetTableStatus implements TableExecutor interface -func (p *processor) GetTableStatus(tableID model.TableID) tablepb.TableStatus { +func (p *processor) GetTableStatus(span tablepb.Span) tablepb.TableStatus { if p.pullBasedSinking { - state, exist := p.sinkManager.GetTableState(tableID) + state, exist := p.sinkManager.GetTableState(span.TableID) if !exist { return tablepb.TableStatus{ - TableID: tableID, + TableID: span.TableID, + Span: span, State: tablepb.TableStateAbsent, } } - sinkStats := p.sinkManager.GetTableStats(tableID) + sinkStats := p.sinkManager.GetTableStats(span.TableID) return tablepb.TableStatus{ - TableID: tableID, + TableID: span.TableID, + Span: span, Checkpoint: tablepb.Checkpoint{ CheckpointTs: sinkStats.CheckpointTs, ResolvedTs: sinkStats.ResolvedTs, }, State: state, - Stats: p.getStatsFromSourceManagerAndSinkManager(tableID, sinkStats), + Stats: p.getStatsFromSourceManagerAndSinkManager(span.TableID, sinkStats), } } - table, ok := p.tables[tableID] + table, ok := p.tables[span.TableID] if !ok { return tablepb.TableStatus{ - TableID: tableID, + TableID: span.TableID, + Span: span, State: tablepb.TableStateAbsent, } } return tablepb.TableStatus{ - TableID: tableID, + TableID: span.TableID, + Span: span, Checkpoint: tablepb.Checkpoint{ CheckpointTs: table.CheckpointTs(), ResolvedTs: table.ResolvedTs(), @@ -924,9 +927,10 @@ func (p *processor) newAgentImpl( messageRouter := ctx.GlobalVars().MessageRouter etcdClient := ctx.GlobalVars().EtcdClient captureID := ctx.GlobalVars().CaptureInfo.ID + cfg := config.GetGlobalServerConfig().Debug.Scheduler ret, err = scheduler.NewAgent( ctx, captureID, liveness, - messageServer, messageRouter, etcdClient, p, p.changefeedID) + messageServer, messageRouter, etcdClient, p, p.changefeedID, cfg) return ret, errors.Trace(err) } diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 93f13ebb7d5..1c2ab2862f3 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -33,6 +33,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator" + "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/upstream" "github.com/stretchr/testify/require" ) @@ -248,7 +249,7 @@ type mockAgent struct { } func (a *mockAgent) Tick(_ context.Context) error { - if len(a.executor.GetAllCurrentTables()) == 0 { + if a.executor.GetAllCurrentTables() == 0 { return nil } a.lastCheckpointTs, _ = a.executor.GetCheckpoint() @@ -287,7 +288,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { tester.MustApplyPatches() // table-1: `preparing` -> `prepared` -> `replicating` - ok, err := p.AddTable(ctx, 1, 20, true) + ok, err := p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 20, true) require.NoError(t, err) require.True(t, ok) @@ -301,7 +302,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { checkpointTs := p.agent.GetLastSentCheckpointTs() require.Equal(t, checkpointTs, model.Ts(0)) - done := p.IsAddTableFinished(1, true) + done := p.IsAddTableFinished(spanz.TableIDToComparableSpan(1), true) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table1.State()) @@ -312,7 +313,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.Nil(t, err) tester.MustApplyPatches() - done = p.IsAddTableFinished(1, true) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(1), true) require.True(t, done) require.Equal(t, tablepb.TableStatePrepared, table1.State()) @@ -320,12 +321,12 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { checkpointTs = p.agent.GetLastSentCheckpointTs() require.Equal(t, checkpointTs, model.Ts(20)) - ok, err = p.AddTable(ctx, 1, 30, true) + ok, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 30, true) require.NoError(t, err) require.True(t, ok) require.Equal(t, model.Ts(0), table1.sinkStartTs) - ok, err = p.AddTable(ctx, 1, 30, false) + ok, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 30, false) require.NoError(t, err) require.True(t, ok) require.Equal(t, model.Ts(30), table1.sinkStartTs) @@ -336,7 +337,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.Nil(t, err) tester.MustApplyPatches() - done = p.IsAddTableFinished(1, false) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(1), false) require.True(t, done) require.Equal(t, tablepb.TableStateReplicating, table1.State()) @@ -370,32 +371,33 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { require.NoError(t, err) tester.MustApplyPatches() - ok, err := p.AddTable(ctx, 1, 20, false) + ok, err := p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 20, false) require.NoError(t, err) require.True(t, ok) table1 := p.tables[1].(*mockTablePipeline) require.Equal(t, model.Ts(20), table1.sinkStartTs) require.Equal(t, tablepb.TableStatePreparing, table1.state) - meta := p.GetTableStatus(model.TableID(1)) + meta := p.GetTableStatus(spanz.TableIDToComparableSpan(1)) require.Equal(t, model.TableID(1), meta.TableID) + require.Equal(t, spanz.TableIDToComparableSpan(1), meta.Span) require.Equal(t, tablepb.TableStatePreparing, meta.State) - ok, err = p.AddTable(ctx, 2, 20, false) + ok, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(2), 20, false) require.NoError(t, err) require.True(t, ok) table2 := p.tables[2].(*mockTablePipeline) require.Equal(t, model.Ts(20), table2.sinkStartTs) require.Equal(t, tablepb.TableStatePreparing, table2.state) - ok, err = p.AddTable(ctx, 3, 20, false) + ok, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(3), 20, false) require.NoError(t, err) require.True(t, ok) table3 := p.tables[3].(*mockTablePipeline) require.Equal(t, model.Ts(20), table3.sinkStartTs) require.Equal(t, tablepb.TableStatePreparing, table3.state) - ok, err = p.AddTable(ctx, 4, 20, false) + ok, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(4), 20, false) require.NoError(t, err) require.True(t, ok) table4 := p.tables[4].(*mockTablePipeline) @@ -407,16 +409,16 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { checkpointTs := p.agent.GetLastSentCheckpointTs() require.Equal(t, checkpointTs, model.Ts(0)) - done := p.IsAddTableFinished(1, false) + done := p.IsAddTableFinished(spanz.TableIDToComparableSpan(1), false) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table1.State()) - done = p.IsAddTableFinished(2, false) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(2), false) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table2.State()) - done = p.IsAddTableFinished(3, false) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(3), false) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table3.State()) - done = p.IsAddTableFinished(4, false) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(4), false) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table4.State()) require.Len(t, p.tables, 4) @@ -436,16 +438,16 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { table3.checkpointTs = 30 table4.checkpointTs = 30 - done = p.IsAddTableFinished(1, false) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(1), false) require.True(t, done) require.Equal(t, tablepb.TableStateReplicating, table1.State()) - done = p.IsAddTableFinished(2, false) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(2), false) require.True(t, done) require.Equal(t, tablepb.TableStateReplicating, table2.State()) - done = p.IsAddTableFinished(3, false) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(3), false) require.True(t, done) require.Equal(t, tablepb.TableStateReplicating, table3.State()) - done = p.IsAddTableFinished(4, false) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(4), false) require.True(t, done) require.Equal(t, tablepb.TableStateReplicating, table4.State()) @@ -471,7 +473,7 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { require.NoError(t, err) tester.MustApplyPatches() - ok = p.RemoveTable(3) + ok = p.RemoveTable(spanz.TableIDToComparableSpan(3)) require.True(t, ok) err = p.Tick(ctx) @@ -483,7 +485,7 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { require.False(t, table3.canceled) require.Equal(t, model.Ts(60), table3.CheckpointTs()) - checkpointTs, done = p.IsRemoveTableFinished(3) + checkpointTs, done = p.IsRemoveTableFinished(spanz.TableIDToComparableSpan(3)) require.False(t, done) require.Equal(t, model.Ts(0), checkpointTs) @@ -506,11 +508,12 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { require.Len(t, p.tables, 4) require.False(t, table3.canceled) - checkpointTs, done = p.IsRemoveTableFinished(3) + checkpointTs, done = p.IsRemoveTableFinished(spanz.TableIDToComparableSpan(3)) require.True(t, done) require.Equal(t, model.Ts(65), checkpointTs) - meta = p.GetTableStatus(model.TableID(3)) + meta = p.GetTableStatus(spanz.TableIDToComparableSpan(3)) require.Equal(t, model.TableID(3), meta.TableID) + require.Equal(t, spanz.TableIDToComparableSpan(3), meta.Span) require.Equal(t, tablepb.TableStateAbsent, meta.State) require.Len(t, p.tables, 3) @@ -602,10 +605,10 @@ func TestProcessorClose(t *testing.T) { tester.MustApplyPatches() // add tables - done, err := p.AddTable(ctx, model.TableID(1), 20, false) + done, err := p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 20, false) require.Nil(t, err) require.True(t, done) - done, err = p.AddTable(ctx, model.TableID(2), 30, false) + done, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(2), 30, false) require.Nil(t, err) require.True(t, done) @@ -642,10 +645,10 @@ func TestProcessorClose(t *testing.T) { tester.MustApplyPatches() // add tables - done, err = p.AddTable(ctx, model.TableID(1), 20, false) + done, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 20, false) require.Nil(t, err) require.True(t, done) - done, err = p.AddTable(ctx, model.TableID(2), 30, false) + done, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(2), 30, false) require.Nil(t, err) require.True(t, done) err = p.Tick(ctx) @@ -675,10 +678,10 @@ func TestPositionDeleted(t *testing.T) { p, tester := initProcessor4Test(ctx, t, &liveness) var err error // add table - done, err := p.AddTable(ctx, model.TableID(1), 30, false) + done, err := p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 30, false) require.Nil(t, err) require.True(t, done) - done, err = p.AddTable(ctx, model.TableID(2), 40, false) + done, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(2), 40, false) require.Nil(t, err) require.True(t, done) // init tick @@ -689,11 +692,11 @@ func TestPositionDeleted(t *testing.T) { table1 := p.tables[1].(*mockTablePipeline) table2 := p.tables[2].(*mockTablePipeline) - table1.resolvedTs += 1 - table2.resolvedTs += 1 + table1.resolvedTs++ + table2.resolvedTs++ - table1.checkpointTs += 1 - table2.checkpointTs += 1 + table1.checkpointTs++ + table2.checkpointTs++ // cal position err = p.Tick(ctx) @@ -795,7 +798,7 @@ func TestUpdateBarrierTs(t *testing.T) { }) p.schemaStorage.(*mockSchemaStorage).resolvedTs = 10 - done, err := p.AddTable(ctx, model.TableID(1), 5, false) + done, err := p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 5, false) require.True(t, done) require.Nil(t, err) err = p.Tick(ctx) diff --git a/cdc/scheduler/internal/table_executor.go b/cdc/scheduler/internal/table_executor.go index 5c7736f9ec3..680c82eb424 100644 --- a/cdc/scheduler/internal/table_executor.go +++ b/cdc/scheduler/internal/table_executor.go @@ -30,18 +30,18 @@ type TableExecutor interface { // if `isPrepare` is true, the 1st phase of the 2 phase scheduling protocol. // if `isPrepare` is false, the 2nd phase. AddTable( - ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool, + ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool, ) (done bool, err error) // IsAddTableFinished make sure the requested table is in the proper status - IsAddTableFinished(tableID model.TableID, isPrepare bool) (done bool) + IsAddTableFinished(span tablepb.Span, isPrepare bool) (done bool) // RemoveTable remove the table, return true if the table is already removed - RemoveTable(tableID model.TableID) (done bool) + RemoveTable(span tablepb.Span) (done bool) // IsRemoveTableFinished convince the table is fully stopped. // return false if table is not stopped // return true and corresponding checkpoint otherwise. - IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool) + IsRemoveTableFinished(span tablepb.Span) (model.Ts, bool) // GetAllCurrentTables should return all tables that are being run, // being added and being removed. @@ -49,7 +49,7 @@ type TableExecutor interface { // NOTE: two subsequent calls to the method should return the same // result, unless there is a call to AddTable, RemoveTable, IsAddTableFinished // or IsRemoveTableFinished in between two calls to this method. - GetAllCurrentTables() []model.TableID + GetAllCurrentTables() int // GetCheckpoint returns the local checkpoint-ts and resolved-ts of // the processor. Its calculation should take into consideration all @@ -58,5 +58,5 @@ type TableExecutor interface { GetCheckpoint() (checkpointTs, resolvedTs model.Ts) // GetTableStatus return the checkpoint and resolved ts for the given table - GetTableStatus(tableID model.TableID) tablepb.TableStatus + GetTableStatus(span tablepb.Span) tablepb.TableStatus } diff --git a/cdc/scheduler/internal/v3/agent/agent.go b/cdc/scheduler/internal/v3/agent/agent.go index db302864bb2..5e8be028ecd 100644 --- a/cdc/scheduler/internal/v3/agent/agent.go +++ b/cdc/scheduler/internal/v3/agent/agent.go @@ -23,8 +23,10 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/scheduler/internal" + "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/compat" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/transport" "github.com/pingcap/tiflow/cdc/scheduler/schedulepb" + "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/p2p" @@ -37,7 +39,8 @@ var _ internal.Agent = (*agent)(nil) type agent struct { agentInfo - trans transport.Transport + trans transport.Transport + compat *compat.Compat tableM *tableManager @@ -74,9 +77,8 @@ func newAgentInfo(changefeedID model.ChangeFeedID, captureID model.CaptureID) ag } type ownerInfo struct { - Revision schedulepb.OwnerRevision - Version string - CaptureID string + model.CaptureInfo + Revision schedulepb.OwnerRevision } func newAgent( @@ -86,11 +88,13 @@ func newAgent( changeFeedID model.ChangeFeedID, etcdClient etcd.CDCEtcdClient, tableExecutor internal.TableExecutor, + cfg *config.SchedulerConfig, ) (internal.Agent, error) { result := &agent{ agentInfo: newAgentInfo(changeFeedID, captureID), tableM: newTableManager(changeFeedID, tableExecutor), liveness: liveness, + compat: compat.New(cfg, map[model.CaptureID]*model.CaptureInfo{}), } etcdCliCtx, cancel := context.WithTimeout(ctx, 5*time.Second) @@ -111,6 +115,25 @@ func newAgent( zap.Error(err)) return result, nil } + var ownerCaptureInfo *model.CaptureInfo + _, captures, err := etcdClient.GetCaptures(ctx) + for _, captureInfo := range captures { + if captureInfo.ID == ownerCaptureID { + ownerCaptureInfo = captureInfo + break + } + } + if ownerCaptureInfo == nil { + log.Info("schedulerv3: no owner found. We will wait for an owner to contact us.", + zap.String("ownerCaptureID", ownerCaptureID), + zap.String("namespace", changeFeedID.Namespace), + zap.String("changefeed", changeFeedID.ID), + zap.Error(err)) + return result, nil + } + result.compat.UpdateCaptureInfo(map[model.CaptureID]*model.CaptureInfo{ + ownerCaptureID: ownerCaptureInfo, + }) log.Info("schedulerv3: agent owner found", zap.String("ownerCaptureID", ownerCaptureID), @@ -133,11 +156,12 @@ func newAgent( return nil, err } + // We don't need address, and owner info will be updated when there is a + // new owner elected. To avoid confusion, just leave it empty. + ownerCaptureInfo.AdvertiseAddr = "" result.ownerInfo = ownerInfo{ - // owner's version can only be got by receiving heartbeat - Version: "", - CaptureID: ownerCaptureID, - Revision: schedulepb.OwnerRevision{Revision: revision}, + Revision: schedulepb.OwnerRevision{Revision: revision}, + CaptureInfo: *ownerCaptureInfo, } return result, nil } @@ -151,9 +175,10 @@ func NewAgent(ctx context.Context, messageRouter p2p.MessageRouter, etcdClient etcd.CDCEtcdClient, tableExecutor internal.TableExecutor, + cfg *config.SchedulerConfig, ) (internal.Agent, error) { result, err := newAgent( - ctx, captureID, liveness, changeFeedID, etcdClient, tableExecutor) + ctx, captureID, liveness, changeFeedID, etcdClient, tableExecutor, cfg) if err != nil { return nil, errors.Trace(err) } @@ -235,17 +260,18 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) []*schedulepb.Message { func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) *schedulepb.Message { allTables := a.tableM.getAllTables() - result := make([]tablepb.TableStatus, 0, len(allTables)) - for _, table := range allTables { + result := make([]tablepb.TableStatus, 0, allTables.Len()) + allTables.Ascend(func(span tablepb.Span, table *table) bool { status := table.getTableStatus() if table.task != nil && table.task.IsRemove { status.State = tablepb.TableStateStopping } result = append(result, status) - } - for _, tableID := range request.GetTableIDs() { - if _, ok := allTables[tableID]; !ok { - status := a.tableM.getTableStatus(tableID) + return true + }) + for _, span := range request.GetSpans() { + if _, ok := allTables.Get(span); !ok { + status := a.tableM.getTableStatus(span) result = append(result, status) } } @@ -280,7 +306,7 @@ const ( ) type dispatchTableTask struct { - TableID model.TableID + Span tablepb.Span StartTs model.Ts IsRemove bool IsPrepare bool @@ -311,31 +337,31 @@ func (a *agent) handleMessageDispatchTableRequest( // this should be guaranteed by the caller of the method. switch req := request.Request.(type) { case *schedulepb.DispatchTableRequest_AddTable: - tableID := req.AddTable.GetTableID() + span := req.AddTable.GetSpan() task = &dispatchTableTask{ - TableID: tableID, + Span: span, StartTs: req.AddTable.GetCheckpoint().CheckpointTs, IsRemove: false, IsPrepare: req.AddTable.GetIsSecondary(), Epoch: epoch, status: dispatchTableTaskReceived, } - table = a.tableM.addTable(tableID) + table = a.tableM.addTable(span) case *schedulepb.DispatchTableRequest_RemoveTable: - tableID := req.RemoveTable.GetTableID() - table, ok = a.tableM.getTable(tableID) + span := req.RemoveTable.GetSpan() + table, ok = a.tableM.getTable(span) if !ok { log.Warn("schedulerv3: agent ignore remove table request, "+ "since the table not found", zap.String("capture", a.CaptureID), zap.String("namespace", a.ChangeFeedID.Namespace), zap.String("changefeed", a.ChangeFeedID.ID), - zap.Any("tableID", tableID), + zap.Stringer("span", &span), zap.Any("request", request)) return } task = &dispatchTableTask{ - TableID: tableID, + Span: span, IsRemove: true, Epoch: epoch, status: dispatchTableTaskReceived, @@ -373,7 +399,7 @@ func (a *agent) Close() error { // version: the incoming owner's semantic version string func (a *agent) handleOwnerInfo(id model.CaptureID, revision int64, version string) bool { if a.ownerInfo.Revision.Revision == revision { - if a.ownerInfo.CaptureID != id { + if a.ownerInfo.ID != id { // This panic will happen only if two messages have been received // with the same ownerRev but with different ownerIDs. // This should never happen unless the election via Etcd is buggy. @@ -381,7 +407,7 @@ func (a *agent) handleOwnerInfo(id model.CaptureID, revision int64, version stri zap.String("capture", a.CaptureID), zap.String("namespace", a.ChangeFeedID.Namespace), zap.String("changefeed", a.ChangeFeedID.ID), - zap.String("expected", a.ownerInfo.CaptureID), + zap.String("expected", a.ownerInfo.ID), zap.String("actual", id)) } return true @@ -389,12 +415,15 @@ func (a *agent) handleOwnerInfo(id model.CaptureID, revision int64, version stri // the current owner is staled if a.ownerInfo.Revision.Revision < revision { - a.ownerInfo.CaptureID = id + a.ownerInfo.CaptureInfo.ID = id + a.ownerInfo.CaptureInfo.Version = version a.ownerInfo.Revision.Revision = revision - a.ownerInfo.Version = version a.resetEpoch() + a.compat.UpdateCaptureInfo(map[model.CaptureID]*model.CaptureInfo{ + id: &a.ownerInfo.CaptureInfo, + }) log.Info("schedulerv3: new owner in power", zap.String("capture", a.CaptureID), zap.String("namespace", a.ChangeFeedID.Namespace), @@ -409,9 +438,11 @@ func (a *agent) handleOwnerInfo(id model.CaptureID, revision int64, version stri zap.String("namespace", a.ChangeFeedID.Namespace), zap.String("changefeed", a.ChangeFeedID.ID), zap.Any("staledOwner", ownerInfo{ - CaptureID: id, - Revision: schedulepb.OwnerRevision{Revision: revision}, - Version: version, + CaptureInfo: model.CaptureInfo{ + ID: id, + Version: version, + }, + Revision: schedulepb.OwnerRevision{Revision: revision}, }), zap.Any("owner", a.ownerInfo), zap.Any("agent", a.agentInfo)) @@ -433,6 +464,7 @@ func (a *agent) recvMsgs(ctx context.Context) ([]*schedulepb.Message, error) { messages[n] = val n++ } + a.compat.AfterTransportReceive(messages[:n]) return messages[:n], nil } @@ -452,7 +484,8 @@ func (a *agent) sendMsgs(ctx context.Context, msgs []*schedulepb.Message) error ProcessorEpoch: a.Epoch, } m.From = a.CaptureID - m.To = a.ownerInfo.CaptureID + m.To = a.ownerInfo.ID } + a.compat.BeforeTransportSend(msgs) return a.trans.Send(ctx, msgs) } diff --git a/cdc/scheduler/internal/v3/agent/agent_bench_test.go b/cdc/scheduler/internal/v3/agent/agent_bench_test.go index e82325a4db8..d6020fdb551 100644 --- a/cdc/scheduler/internal/v3/agent/agent_bench_test.go +++ b/cdc/scheduler/internal/v3/agent/agent_bench_test.go @@ -19,18 +19,22 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/scheduler/schedulepb" + "github.com/pingcap/tiflow/pkg/spanz" ) func benchmarkHeartbeatResponse(b *testing.B, bench func(b *testing.B, a *agent)) { upperBound := 16384 for size := 1; size <= upperBound; size *= 2 { tableExec := newMockTableExecutor() + liveness := model.LivenessCaptureAlive a := &agent{ - tableM: newTableManager(model.ChangeFeedID{}, tableExec), + tableM: newTableManager(model.ChangeFeedID{}, tableExec), + liveness: &liveness, } for j := 0; j < size; j++ { - _ = a.tableM.addTable(model.TableID(10000 + j)) + span := spanz.TableIDToComparableSpan(model.TableID(10000 + j)) + _ = a.tableM.addTable(span) } b.ResetTimer() @@ -41,7 +45,7 @@ func benchmarkHeartbeatResponse(b *testing.B, bench func(b *testing.B, a *agent) func BenchmarkRefreshAllTables(b *testing.B) { benchmarkHeartbeatResponse(b, func(b *testing.B, a *agent) { - total := len(a.tableM.tables) + total := a.tableM.tables.Len() b.Run(fmt.Sprintf("%d tables", total), func(b *testing.B) { for i := 0; i < b.N; i++ { a.handleMessageHeartbeat(&schedulepb.Heartbeat{}) diff --git a/cdc/scheduler/internal/v3/agent/agent_test.go b/cdc/scheduler/internal/v3/agent/agent_test.go index 4727db6e27f..15930f498fc 100644 --- a/cdc/scheduler/internal/v3/agent/agent_test.go +++ b/cdc/scheduler/internal/v3/agent/agent_test.go @@ -22,10 +22,14 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/cdc/scheduler/internal" + "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/compat" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/transport" "github.com/pingcap/tiflow/cdc/scheduler/schedulepb" + "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" mock_etcd "github.com/pingcap/tiflow/pkg/etcd/mock" + "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.etcd.io/etcd/client/v3/concurrency" @@ -60,10 +64,14 @@ func iterPermutation(sequence []int, fn func(sequence []int)) { func newAgent4Test() *agent { a := &agent{ ownerInfo: ownerInfo{ - Version: "owner-version-1", - CaptureID: "owner-1", - Revision: schedulepb.OwnerRevision{Revision: 1}, + CaptureInfo: model.CaptureInfo{ + Version: "owner-version-1", + ID: "owner-1", + }, + Revision: schedulepb.OwnerRevision{Revision: 1}, }, + compat: compat.New( + config.GetDefaultServerConfig().Debug.Scheduler, map[string]*model.CaptureInfo{}), } a.Version = "agent-version-1" @@ -82,12 +90,15 @@ func TestNewAgent(t *testing.T) { me := mock_etcd.NewMockCDCEtcdClient(gomock.NewController(t)) tableExector := newMockTableExecutor() + cfg := &config.SchedulerConfig{RegionPerSpan: 1} // owner and revision found successfully - me.EXPECT().GetOwnerID(gomock.Any()).Return("owneID", nil).Times(1) + me.EXPECT().GetOwnerID(gomock.Any()).Return("ownerID", nil).Times(1) + me.EXPECT().GetCaptures( + gomock.Any()).Return(int64(0), []*model.CaptureInfo{{ID: "ownerID"}}, nil).Times(1) me.EXPECT().GetOwnerRevision(gomock.Any(), gomock.Any()).Return(int64(2333), nil).Times(1) a, err := newAgent( - context.Background(), "capture-test", &liveness, changefeed, me, tableExector) + context.Background(), "capture-test", &liveness, changefeed, me, tableExector, cfg) require.NoError(t, err) require.NotNil(t, a) @@ -95,31 +106,35 @@ func TestNewAgent(t *testing.T) { me.EXPECT().GetOwnerID(gomock.Any()). Return("", concurrency.ErrElectionNoLeader).Times(1) a, err = newAgent( - context.Background(), "capture-test", &liveness, changefeed, me, tableExector) + context.Background(), "capture-test", &liveness, changefeed, me, tableExector, cfg) require.NoError(t, err) require.NotNil(t, a) // owner not found since pd is unstable me.EXPECT().GetOwnerID(gomock.Any()).Return("", cerror.ErrPDEtcdAPIError).Times(1) a, err = newAgent( - context.Background(), "capture-test", &liveness, changefeed, me, tableExector) + context.Background(), "capture-test", &liveness, changefeed, me, tableExector, cfg) require.Error(t, err) require.Nil(t, a) // owner found, get revision failed. me.EXPECT().GetOwnerID(gomock.Any()).Return("ownerID", nil).Times(1) + me.EXPECT().GetCaptures( + gomock.Any()).Return(int64(0), []*model.CaptureInfo{{ID: "ownerID"}}, nil).Times(1) me.EXPECT().GetOwnerRevision(gomock.Any(), gomock.Any()). Return(int64(0), cerror.ErrPDEtcdAPIError).Times(1) a, err = newAgent( - context.Background(), "capture-test", &liveness, changefeed, me, tableExector) + context.Background(), "capture-test", &liveness, changefeed, me, tableExector, cfg) require.Error(t, err) require.Nil(t, a) me.EXPECT().GetOwnerID(gomock.Any()).Return("ownerID", nil).Times(1) + me.EXPECT().GetCaptures( + gomock.Any()).Return(int64(0), []*model.CaptureInfo{{ID: "ownerID"}}, nil).Times(1) me.EXPECT().GetOwnerRevision(gomock.Any(), gomock.Any()). Return(int64(0), cerror.ErrOwnerNotFound).Times(1) a, err = newAgent( - context.Background(), "capture-test", &liveness, changefeed, me, tableExector) + context.Background(), "capture-test", &liveness, changefeed, me, tableExector, cfg) require.NoError(t, err) require.NotNil(t, a) } @@ -134,7 +149,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { removeTableRequest := &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_RemoveTable{ RemoveTable: &schedulepb.RemoveTableRequest{ - TableID: 1, + Span: spanz.TableIDToComparableSpan(1), }, }, } @@ -150,7 +165,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { addTableRequest := &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_AddTable{ AddTable: &schedulepb.AddTableRequest{ - TableID: 1, + Span: spanz.TableIDToComparableSpan(1), IsSecondary: true, }, }, @@ -169,9 +184,9 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { addTableResponse, ok := responses[0].DispatchTableResponse. Response.(*schedulepb.DispatchTableResponse_AddTable) require.True(t, ok) - require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.TableID) + require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.Span.TableID) require.Equal(t, tablepb.TableStateAbsent, addTableResponse.AddTable.Status.State) - require.NotContains(t, a.tableM.tables, model.TableID(1)) + require.False(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) // Force set liveness to alive. *a.liveness = model.LivenessCaptureAlive @@ -196,9 +211,9 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { addTableResponse, ok = responses[0].DispatchTableResponse. Response.(*schedulepb.DispatchTableResponse_AddTable) require.True(t, ok) - require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.TableID) + require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.Span.TableID) require.Equal(t, tablepb.TableStatePrepared, addTableResponse.AddTable.Status.State) - require.Contains(t, a.tableM.tables, model.TableID(1)) + require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) // let the prepared table become replicating, by set `IsSecondary` to false. addTableRequest.Request.(*schedulepb.DispatchTableRequest_AddTable). @@ -217,9 +232,9 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { addTableResponse, ok = responses[0].DispatchTableResponse. Response.(*schedulepb.DispatchTableResponse_AddTable) require.True(t, ok) - require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.TableID) + require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.Span.TableID) require.Equal(t, tablepb.TableStatePrepared, addTableResponse.AddTable.Status.State) - require.Contains(t, a.tableM.tables, model.TableID(1)) + require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) mockTableExecutor.ExpectedCalls = nil mockTableExecutor.On("IsAddTableFinished", mock.Anything, @@ -232,9 +247,9 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { addTableResponse, ok = responses[0].DispatchTableResponse. Response.(*schedulepb.DispatchTableResponse_AddTable) require.True(t, ok) - require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.TableID) + require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.Span.TableID) require.Equal(t, tablepb.TableStateReplicating, addTableResponse.AddTable.Status.State) - require.Contains(t, a.tableM.tables, model.TableID(1)) + require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) mockTableExecutor.On("RemoveTable", mock.Anything, mock.Anything). Return(false) @@ -246,9 +261,9 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { removeTableResponse, ok := responses[0].DispatchTableResponse. Response.(*schedulepb.DispatchTableResponse_RemoveTable) require.True(t, ok) - require.Equal(t, model.TableID(1), removeTableResponse.RemoveTable.Status.TableID) + require.Equal(t, model.TableID(1), removeTableResponse.RemoveTable.Status.Span.TableID) require.Equal(t, tablepb.TableStateStopping, removeTableResponse.RemoveTable.Status.State) - require.Contains(t, a.tableM.tables, model.TableID(1)) + require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) mockTableExecutor.ExpectedCalls = nil mockTableExecutor.On("RemoveTable", mock.Anything, mock.Anything). @@ -263,7 +278,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { removeTableResponse, ok = responses[0].DispatchTableResponse. Response.(*schedulepb.DispatchTableResponse_RemoveTable) require.True(t, ok) - require.Equal(t, model.TableID(1), removeTableResponse.RemoveTable.Status.TableID) + require.Equal(t, model.TableID(1), removeTableResponse.RemoveTable.Status.Span.TableID) require.Equal(t, tablepb.TableStateStopping, removeTableResponse.RemoveTable.Status.State) mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1] @@ -277,10 +292,10 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { removeTableResponse, ok = responses[0].DispatchTableResponse. Response.(*schedulepb.DispatchTableResponse_RemoveTable) require.True(t, ok) - require.Equal(t, model.TableID(1), removeTableResponse.RemoveTable.Status.TableID) + require.Equal(t, model.TableID(1), removeTableResponse.RemoveTable.Status.Span.TableID) require.Equal(t, tablepb.TableStateStopped, removeTableResponse.RemoveTable.Status.State) require.Equal(t, model.Ts(3), removeTableResponse.RemoveTable.Checkpoint.CheckpointTs) - require.NotContains(t, a.tableM.tables, model.TableID(1)) + require.False(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) } func TestAgentHandleMessageHeartbeat(t *testing.T) { @@ -291,20 +306,25 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { a.tableM = newTableManager(model.ChangeFeedID{}, mockTableExecutor) for i := 0; i < 5; i++ { - a.tableM.addTable(model.TableID(i)) + a.tableM.addTable(spanz.TableIDToComparableSpan(int64(i))) } - a.tableM.tables[model.TableID(0)].state = tablepb.TableStatePreparing - a.tableM.tables[model.TableID(1)].state = tablepb.TableStatePrepared - a.tableM.tables[model.TableID(2)].state = tablepb.TableStateReplicating - a.tableM.tables[model.TableID(3)].state = tablepb.TableStateStopping - a.tableM.tables[model.TableID(4)].state = tablepb.TableStateStopped - - mockTableExecutor.tables[model.TableID(0)] = tablepb.TableStatePreparing - mockTableExecutor.tables[model.TableID(1)] = tablepb.TableStatePrepared - mockTableExecutor.tables[model.TableID(2)] = tablepb.TableStateReplicating - mockTableExecutor.tables[model.TableID(3)] = tablepb.TableStateStopping - mockTableExecutor.tables[model.TableID(4)] = tablepb.TableStateStopped + a.tableM.tables.GetV(spanz.TableIDToComparableSpan(0)).state = tablepb.TableStatePreparing + a.tableM.tables.GetV(spanz.TableIDToComparableSpan(1)).state = tablepb.TableStatePrepared + a.tableM.tables.GetV(spanz.TableIDToComparableSpan(2)).state = tablepb.TableStateReplicating + a.tableM.tables.GetV(spanz.TableIDToComparableSpan(3)).state = tablepb.TableStateStopping + a.tableM.tables.GetV(spanz.TableIDToComparableSpan(4)).state = tablepb.TableStateStopped + + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(0), tablepb.TableStatePreparing) + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(1), tablepb.TableStatePrepared) + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(2), tablepb.TableStateReplicating) + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(3), tablepb.TableStateStopping) + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(4), tablepb.TableStateStopped) heartbeat := &schedulepb.Message{ Header: &schedulepb.Message_Header{ @@ -314,7 +334,18 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { MsgType: schedulepb.MsgHeartbeat, From: "owner-1", Heartbeat: &schedulepb.Heartbeat{ - TableIDs: []model.TableID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + Spans: []tablepb.Span{ + spanz.TableIDToComparableSpan(0), + spanz.TableIDToComparableSpan(1), + spanz.TableIDToComparableSpan(2), + spanz.TableIDToComparableSpan(3), + spanz.TableIDToComparableSpan(4), + spanz.TableIDToComparableSpan(5), + spanz.TableIDToComparableSpan(6), + spanz.TableIDToComparableSpan(7), + spanz.TableIDToComparableSpan(8), + spanz.TableIDToComparableSpan(9), + }, }, } @@ -325,7 +356,7 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { result := response[0].GetHeartbeatResponse().Tables require.Len(t, result, 10) sort.Slice(result, func(i, j int) bool { - return result[i].TableID < result[j].TableID + return result[i].Span.Less(&result[j].Span) }) require.Equal(t, tablepb.TableStatePreparing, result[0].State) @@ -337,11 +368,11 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { require.Equal(t, tablepb.TableStateAbsent, result[i].State) } - a.tableM.tables[model.TableID(1)].task = &dispatchTableTask{IsRemove: true} + a.tableM.tables.GetV(spanz.TableIDToComparableSpan(1)).task = &dispatchTableTask{IsRemove: true} response = a.handleMessage([]*schedulepb.Message{heartbeat}) result = response[0].GetHeartbeatResponse().Tables sort.Slice(result, func(i, j int) bool { - return result[i].TableID < result[j].TableID + return result[i].Span.TableID < result[j].Span.TableID }) require.Equal(t, tablepb.TableStateStopping, result[1].State) @@ -376,12 +407,12 @@ func TestAgentPermuteMessages(t *testing.T) { ProcessorEpoch: a.Epoch, }, MsgType: schedulepb.MsgDispatchTableRequest, - From: a.ownerInfo.CaptureID, + From: a.ownerInfo.ID, To: a.CaptureID, DispatchTableRequest: &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_RemoveTable{ RemoveTable: &schedulepb.RemoveTableRequest{ - TableID: 1, + Span: spanz.TableIDToComparableSpan(1), }, }, }, @@ -394,12 +425,12 @@ func TestAgentPermuteMessages(t *testing.T) { ProcessorEpoch: a.Epoch, }, MsgType: schedulepb.MsgDispatchTableRequest, - From: a.ownerInfo.CaptureID, + From: a.ownerInfo.ID, To: a.CaptureID, DispatchTableRequest: &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_AddTable{ AddTable: &schedulepb.AddTableRequest{ - TableID: 1, + Span: spanz.TableIDToComparableSpan(1), IsSecondary: isSecondary, }, }, @@ -416,7 +447,7 @@ func TestAgentPermuteMessages(t *testing.T) { MsgType: schedulepb.MsgHeartbeat, From: "owner-1", Heartbeat: &schedulepb.Heartbeat{ - TableIDs: []model.TableID{1}, + Spans: []tablepb.Span{{TableID: 1}}, }, }) @@ -435,15 +466,20 @@ func TestAgentPermuteMessages(t *testing.T) { t.Logf("test %v, %v", state, sequence) switch state { case tablepb.TableStatePreparing: - mockTableExecutor.tables[tableID] = tablepb.TableStatePreparing + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(tableID), tablepb.TableStatePreparing) case tablepb.TableStatePrepared: - mockTableExecutor.tables[tableID] = tablepb.TableStatePrepared + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(tableID), tablepb.TableStatePrepared) case tablepb.TableStateReplicating: - mockTableExecutor.tables[tableID] = tablepb.TableStateReplicating + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(tableID), tablepb.TableStateReplicating) case tablepb.TableStateStopping: - mockTableExecutor.tables[tableID] = tablepb.TableStateStopping + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(tableID), tablepb.TableStateStopping) case tablepb.TableStateStopped: - mockTableExecutor.tables[tableID] = tablepb.TableStateStopped + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(tableID), tablepb.TableStateStopped) case tablepb.TableStateAbsent: default: } @@ -529,7 +565,7 @@ func TestAgentHandleMessage(t *testing.T) { OwnerRevision: a.ownerInfo.Revision, }, MsgType: schedulepb.MsgHeartbeat, - From: a.ownerInfo.CaptureID, + From: a.ownerInfo.ID, Heartbeat: &schedulepb.Heartbeat{}, } @@ -545,11 +581,11 @@ func TestAgentHandleMessage(t *testing.T) { ProcessorEpoch: schedulepb.ProcessorEpoch{Epoch: "wrong-agent-epoch-1"}, }, MsgType: schedulepb.MsgDispatchTableRequest, - From: a.ownerInfo.CaptureID, + From: a.ownerInfo.ID, DispatchTableRequest: &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_AddTable{ AddTable: &schedulepb.AddTableRequest{ - TableID: 1, + Span: spanz.TableIDToComparableSpan(1), IsSecondary: true, Checkpoint: tablepb.Checkpoint{}, }, @@ -558,13 +594,13 @@ func TestAgentHandleMessage(t *testing.T) { } // wrong epoch, ignored responses := a.handleMessage([]*schedulepb.Message{addTableRequest}) - require.NotContains(t, tableM.tables, model.TableID(1)) + require.False(t, tableM.tables.Has(spanz.TableIDToComparableSpan(1))) require.Len(t, responses, 0) // correct epoch, processing. addTableRequest.Header.ProcessorEpoch = a.Epoch _ = a.handleMessage([]*schedulepb.Message{addTableRequest}) - require.Contains(t, tableM.tables, model.TableID(1)) + require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) heartbeat.Header.OwnerRevision.Revision = 2 response = a.handleMessage([]*schedulepb.Message{heartbeat}) @@ -578,7 +614,7 @@ func TestAgentHandleMessage(t *testing.T) { ProcessorEpoch: a.Epoch, }, MsgType: schedulepb.MsgUnknown, - From: a.ownerInfo.CaptureID, + From: a.ownerInfo.ID, } response = a.handleMessage([]*schedulepb.Message{unknownMessage}) @@ -623,8 +659,8 @@ func TestAgentTick(t *testing.T) { ProcessorEpoch: schedulepb.ProcessorEpoch{}, }, MsgType: schedulepb.MsgHeartbeat, - From: a.ownerInfo.CaptureID, - Heartbeat: &schedulepb.Heartbeat{TableIDs: nil}, + From: a.ownerInfo.ID, + Heartbeat: &schedulepb.Heartbeat{Spans: nil}, } // receive first heartbeat from the owner @@ -637,7 +673,7 @@ func TestAgentTick(t *testing.T) { trans.SendBuffer = trans.SendBuffer[:0] require.Equal(t, schedulepb.MsgHeartbeatResponse, heartbeatResponse.MsgType) - require.Equal(t, a.ownerInfo.CaptureID, heartbeatResponse.To) + require.Equal(t, a.ownerInfo.ID, heartbeatResponse.To) require.Equal(t, a.CaptureID, heartbeatResponse.From) addTableRequest := &schedulepb.Message{ @@ -647,11 +683,11 @@ func TestAgentTick(t *testing.T) { ProcessorEpoch: a.Epoch, }, MsgType: schedulepb.MsgDispatchTableRequest, - From: a.ownerInfo.CaptureID, + From: a.ownerInfo.ID, DispatchTableRequest: &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_AddTable{ AddTable: &schedulepb.AddTableRequest{ - TableID: 1, + Span: spanz.TableIDToComparableSpan(1), IsSecondary: true, Checkpoint: tablepb.Checkpoint{}, }, @@ -666,11 +702,11 @@ func TestAgentTick(t *testing.T) { ProcessorEpoch: a.Epoch, }, MsgType: schedulepb.MsgDispatchTableRequest, - From: a.ownerInfo.CaptureID, + From: a.ownerInfo.ID, DispatchTableRequest: &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_RemoveTable{ RemoveTable: &schedulepb.RemoveTableRequest{ - TableID: 2, + Span: tablepb.Span{TableID: 2}, }, }, }, @@ -721,7 +757,7 @@ func TestAgentHandleLivenessUpdate(t *testing.T) { ProcessorEpoch: a.Epoch, }, MsgType: schedulepb.MsgHeartbeat, - From: a.ownerInfo.CaptureID, + From: a.ownerInfo.ID, Heartbeat: &schedulepb.Heartbeat{ IsStopping: true, }, @@ -753,7 +789,7 @@ func TestAgentCommitAddTableDuringStopping(t *testing.T) { DispatchTableRequest: &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_AddTable{ AddTable: &schedulepb.AddTableRequest{ - TableID: 1, + Span: spanz.TableIDToComparableSpan(1), IsSecondary: true, }, }, @@ -796,7 +832,7 @@ func TestAgentCommitAddTableDuringStopping(t *testing.T) { DispatchTableRequest: &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_AddTable{ AddTable: &schedulepb.AddTableRequest{ - TableID: 1, + Span: spanz.TableIDToComparableSpan(1), IsSecondary: false, }, }, @@ -829,84 +865,173 @@ func TestAgentCommitAddTableDuringStopping(t *testing.T) { require.Equal(t, tablepb.TableStateReplicating, addTableResp.Status.State) } +func TestAgentTransportCompat(t *testing.T) { + t.Parallel() + + a := newAgent4Test() + mockTableExecutor := newMockTableExecutor() + a.tableM = newTableManager(model.ChangeFeedID{}, mockTableExecutor) + trans := transport.NewMockTrans() + a.trans = trans + a.compat = compat.New(&config.SchedulerConfig{ + RegionPerSpan: 1, + }, map[model.CaptureID]*model.CaptureInfo{}) + ctx := context.Background() + + // Disable span replication. + a.handleOwnerInfo("a", a.ownerInfo.Revision.Revision+1, "4.0.0") + require.False(t, a.compat.CheckSpanReplicationEnabled()) + + // Test compat.BeforeTransportSend. + a.sendMsgs( + ctx, []*schedulepb.Message{{ + Header: &schedulepb.Message_Header{ + Version: a.Version, + ProcessorEpoch: a.Epoch, + }, + From: a.CaptureID, To: "a", MsgType: schedulepb.MsgDispatchTableResponse, + DispatchTableResponse: &schedulepb.DispatchTableResponse{ + Response: &schedulepb.DispatchTableResponse_AddTable{ + AddTable: &schedulepb.AddTableResponse{ + Status: &tablepb.TableStatus{ + Span: spanz.TableIDToComparableSpan(1), + }, + }, + }, + }, + }}) + require.EqualValues(t, []*schedulepb.Message{{ + Header: &schedulepb.Message_Header{ + Version: a.Version, + ProcessorEpoch: a.Epoch, + OwnerRevision: a.ownerInfo.Revision, + }, + From: a.CaptureID, To: "a", MsgType: schedulepb.MsgDispatchTableResponse, + DispatchTableResponse: &schedulepb.DispatchTableResponse{ + Response: &schedulepb.DispatchTableResponse_AddTable{ + AddTable: &schedulepb.AddTableResponse{ + Status: &tablepb.TableStatus{ + TableID: 1, + Span: spanz.TableIDToComparableSpan(1), + }, + }, + }, + }, + }}, trans.SendBuffer) + // Test compat.AfterTransportReceive. + trans.RecvBuffer = append(trans.RecvBuffer, &schedulepb.Message{ + Header: &schedulepb.Message_Header{ + Version: a.Version, + OwnerRevision: a.ownerInfo.Revision, + }, + From: "a", To: a.CaptureID, MsgType: schedulepb.MsgDispatchTableRequest, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: 1, + }, + }, + }, + }) + msgs, err := a.recvMsgs(ctx) + require.NoError(t, err) + require.EqualValues(t, []*schedulepb.Message{{ + Header: &schedulepb.Message_Header{ + Version: a.Version, + OwnerRevision: a.ownerInfo.Revision, + }, + From: "a", To: a.CaptureID, MsgType: schedulepb.MsgDispatchTableRequest, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: 1, + Span: spanz.TableIDToComparableSpan(1), + }, + }, + }, + }}, msgs) +} + // MockTableExecutor is a mock implementation of TableExecutor. type MockTableExecutor struct { mock.Mock // it's preferred to use `pipeline.MockPipeline` here to make the test more vivid. - tables map[model.TableID]tablepb.TableState + tables *spanz.Map[tablepb.TableState] } +var _ internal.TableExecutor = (*MockTableExecutor)(nil) + // newMockTableExecutor creates a new mock table executor. func newMockTableExecutor() *MockTableExecutor { return &MockTableExecutor{ - tables: map[model.TableID]tablepb.TableState{}, + tables: spanz.NewMap[tablepb.TableState](), } } // AddTable adds a table to the executor. func (e *MockTableExecutor) AddTable( - ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool, + ctx context.Context, tableID tablepb.Span, startTs model.Ts, isPrepare bool, ) (bool, error) { log.Info("AddTable", - zap.Int64("tableID", tableID), + zap.Stringer("span", &tableID), zap.Any("startTs", startTs), zap.Bool("isPrepare", isPrepare)) - state, ok := e.tables[tableID] + state, ok := e.tables.Get(tableID) if ok { switch state { case tablepb.TableStatePreparing: return true, nil case tablepb.TableStatePrepared: if !isPrepare { - e.tables[tableID] = tablepb.TableStateReplicating + e.tables.ReplaceOrInsert(tableID, tablepb.TableStateReplicating) } return true, nil case tablepb.TableStateReplicating: return true, nil case tablepb.TableStateStopped: - delete(e.tables, tableID) + e.tables.Delete(tableID) } } args := e.Called(ctx, tableID, startTs, isPrepare) if args.Bool(0) { - e.tables[tableID] = tablepb.TableStatePreparing + e.tables.ReplaceOrInsert(tableID, tablepb.TableStatePreparing) } return args.Bool(0), args.Error(1) } // IsAddTableFinished determines if the table has been added. -func (e *MockTableExecutor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bool { - _, ok := e.tables[tableID] +func (e *MockTableExecutor) IsAddTableFinished(tableID tablepb.Span, isPrepare bool) bool { + _, ok := e.tables.Get(tableID) if !ok { log.Panic("table which was added is not found", - zap.Int64("tableID", tableID), + zap.Stringer("span", &tableID), zap.Bool("isPrepare", isPrepare)) } args := e.Called(tableID, isPrepare) if args.Bool(0) { - e.tables[tableID] = tablepb.TableStatePrepared + e.tables.ReplaceOrInsert(tableID, tablepb.TableStatePrepared) if !isPrepare { - e.tables[tableID] = tablepb.TableStateReplicating + e.tables.ReplaceOrInsert(tableID, tablepb.TableStateReplicating) } return true } - e.tables[tableID] = tablepb.TableStatePreparing + e.tables.ReplaceOrInsert(tableID, tablepb.TableStatePreparing) if !isPrepare { - e.tables[tableID] = tablepb.TableStatePrepared + e.tables.ReplaceOrInsert(tableID, tablepb.TableStatePrepared) } return false } // RemoveTable removes a table from the executor. -func (e *MockTableExecutor) RemoveTable(tableID model.TableID) bool { - state, ok := e.tables[tableID] +func (e *MockTableExecutor) RemoveTable(tableID tablepb.Span) bool { + state, ok := e.tables.Get(tableID) if !ok { - log.Warn("table to be remove is not found", zap.Int64("tableID", tableID)) + log.Warn("table to be remove is not found", zap.Stringer("span", &tableID)) return true } switch state { @@ -916,44 +1041,45 @@ func (e *MockTableExecutor) RemoveTable(tableID model.TableID) bool { default: } // the current `processor implementation, does not consider table's state - log.Info("RemoveTable", zap.Int64("tableID", tableID), zap.Any("state", state)) + log.Info("RemoveTable", zap.Stringer("span", &tableID), zap.Any("state", state)) args := e.Called(tableID) if args.Bool(0) { - e.tables[tableID] = tablepb.TableStateStopped + e.tables.ReplaceOrInsert(tableID, tablepb.TableStateStopped) } return args.Bool(0) } // IsRemoveTableFinished determines if the table has been removed. -func (e *MockTableExecutor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool) { - state, ok := e.tables[tableID] +func (e *MockTableExecutor) IsRemoveTableFinished(tableID tablepb.Span) (model.Ts, bool) { + state, ok := e.tables.Get(tableID) if !ok { // the real `table executor` processor, would panic in such case. log.Warn("table to be removed is not found", - zap.Int64("tableID", tableID)) + zap.Stringer("span", &tableID)) return 0, true } args := e.Called(tableID) if args.Bool(1) { log.Info("remove table finished, remove it from the executor", - zap.Int64("tableID", tableID), zap.Any("state", state)) - delete(e.tables, tableID) + zap.Stringer("span", &tableID), zap.Any("state", state)) + e.tables.Delete(tableID) } else { // revert the state back to old state, assume it's `replicating`, // but `preparing` / `prepared` can also be removed. - e.tables[tableID] = tablepb.TableStateReplicating + e.tables.ReplaceOrInsert(tableID, tablepb.TableStateReplicating) } return model.Ts(args.Int(0)), args.Bool(1) } // GetAllCurrentTables returns all tables that are currently being adding, running, or removing. -func (e *MockTableExecutor) GetAllCurrentTables() []model.TableID { - var result []model.TableID - for tableID := range e.tables { - result = append(result, tableID) - } +func (e *MockTableExecutor) GetAllCurrentTables() int { + var result int + e.tables.Ascend(func(span tablepb.Span, value tablepb.TableState) bool { + result++ + return true + }) return result } @@ -964,13 +1090,13 @@ func (e *MockTableExecutor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) } // GetTableStatus implements TableExecutor interface -func (e *MockTableExecutor) GetTableStatus(tableID model.TableID) tablepb.TableStatus { - state, ok := e.tables[tableID] +func (e *MockTableExecutor) GetTableStatus(span tablepb.Span) tablepb.TableStatus { + state, ok := e.tables.Get(span) if !ok { state = tablepb.TableStateAbsent } return tablepb.TableStatus{ - TableID: tableID, - State: state, + Span: span, + State: state, } } diff --git a/cdc/scheduler/internal/v3/agent/table.go b/cdc/scheduler/internal/v3/agent/table.go index a1d933ae174..a41b95fce02 100644 --- a/cdc/scheduler/internal/v3/agent/table.go +++ b/cdc/scheduler/internal/v3/agent/table.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/scheduler/internal" "github.com/pingcap/tiflow/cdc/scheduler/schedulepb" + "github.com/pingcap/tiflow/pkg/spanz" "go.uber.org/zap" ) @@ -29,7 +30,7 @@ import ( // also tracking its progress by utilize the `TableExecutor` type table struct { changefeedID model.ChangeFeedID - id model.TableID + span tablepb.Span state tablepb.TableState executor internal.TableExecutor @@ -38,11 +39,11 @@ type table struct { } func newTable( - changefeed model.ChangeFeedID, tableID model.TableID, executor internal.TableExecutor, + changefeed model.ChangeFeedID, span tablepb.Span, executor internal.TableExecutor, ) *table { return &table{ changefeedID: changefeed, - id: tableID, + span: span, state: tablepb.TableStateAbsent, // use `absent` as the default state. executor: executor, task: nil, @@ -53,14 +54,14 @@ func newTable( func (t *table) getAndUpdateTableState() (tablepb.TableState, bool) { oldState := t.state - meta := t.executor.GetTableStatus(t.id) + meta := t.executor.GetTableStatus(t.span) t.state = meta.State if oldState != t.state { log.Debug("schedulerv3: table state changed", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), + zap.Int64("tableID", t.span.TableID), zap.Stringer("oldState", oldState), zap.Stringer("state", t.state)) return t.state, true @@ -69,7 +70,7 @@ func (t *table) getAndUpdateTableState() (tablepb.TableState, bool) { } func (t *table) getTableStatus() tablepb.TableStatus { - return t.executor.GetTableStatus(t.id) + return t.executor.GetTableStatus(t.span) } func newAddTableResponseMessage(status tablepb.TableStatus) *schedulepb.Message { @@ -111,14 +112,14 @@ func (t *table) handleRemoveTableTask() *schedulepb.Message { log.Warn("schedulerv3: remove table, but table is absent", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id)) + zap.Int64("tableID", t.span.TableID)) t.task = nil return newRemoveTableResponseMessage(t.getTableStatus()) case tablepb.TableStateStopping, // stopping now is useless tablepb.TableStateStopped: // release table resource, and get the latest checkpoint // this will let the table become `absent` - checkpointTs, done := t.executor.IsRemoveTableFinished(t.id) + checkpointTs, done := t.executor.IsRemoveTableFinished(t.span) if !done { // actually, this should never be hit, since we know that table is stopped. status := t.getTableStatus() @@ -133,7 +134,7 @@ func (t *table) handleRemoveTableTask() *schedulepb.Message { case tablepb.TableStatePreparing, tablepb.TableStatePrepared, tablepb.TableStateReplicating: - done := t.executor.RemoveTable(t.task.TableID) + done := t.executor.RemoveTable(t.task.Span) if !done { status := t.getTableStatus() status.State = tablepb.TableStateStopping @@ -144,7 +145,7 @@ func (t *table) handleRemoveTableTask() *schedulepb.Message { log.Panic("schedulerv3: unknown table state", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), zap.Stringer("state", state)) + zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state)) } } return nil @@ -156,12 +157,12 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess for changed { switch state { case tablepb.TableStateAbsent: - done, err := t.executor.AddTable(ctx, t.task.TableID, t.task.StartTs, t.task.IsPrepare) + done, err := t.executor.AddTable(ctx, t.task.Span, t.task.StartTs, t.task.IsPrepare) if err != nil || !done { log.Warn("schedulerv3: agent add table failed", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), zap.Any("task", t.task), + zap.Int64("tableID", t.span.TableID), zap.Any("task", t.task), zap.Error(err)) status := t.getTableStatus() return newAddTableResponseMessage(status), errors.Trace(err) @@ -171,7 +172,7 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess log.Info("schedulerv3: table is replicating", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), zap.Stringer("state", state)) + zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state)) t.task = nil status := t.getTableStatus() return newAddTableResponseMessage(status), nil @@ -181,18 +182,18 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess log.Info("schedulerv3: table is prepared", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), zap.Stringer("state", state)) + zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state)) t.task = nil return newAddTableResponseMessage(t.getTableStatus()), nil } if t.task.status == dispatchTableTaskReceived { - done, err := t.executor.AddTable(ctx, t.task.TableID, t.task.StartTs, false) + done, err := t.executor.AddTable(ctx, t.task.Span, t.task.StartTs, false) if err != nil || !done { log.Warn("schedulerv3: agent add table failed", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), zap.Stringer("state", state), + zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state), zap.Error(err)) status := t.getTableStatus() return newAddTableResponseMessage(status), errors.Trace(err) @@ -200,7 +201,7 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess t.task.status = dispatchTableTaskProcessed } - done := t.executor.IsAddTableFinished(t.task.TableID, false) + done := t.executor.IsAddTableFinished(t.task.Span, false) if !done { return newAddTableResponseMessage(t.getTableStatus()), nil } @@ -208,7 +209,7 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess case tablepb.TableStatePreparing: // `preparing` is not stable state and would last a long time, // it's no need to return such a state, to make the coordinator become burdensome. - done := t.executor.IsAddTableFinished(t.task.TableID, t.task.IsPrepare) + done := t.executor.IsAddTableFinished(t.task.Span, t.task.IsPrepare) if !done { return nil, nil } @@ -216,20 +217,20 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess log.Info("schedulerv3: add table finished", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), zap.Stringer("state", state)) + zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state)) case tablepb.TableStateStopping, tablepb.TableStateStopped: log.Warn("schedulerv3: ignore add table", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id)) + zap.Int64("tableID", t.span.TableID)) t.task = nil return newAddTableResponseMessage(t.getTableStatus()), nil default: log.Panic("schedulerv3: unknown table state", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id)) + zap.Int64("tableID", t.span.TableID)) } } @@ -237,18 +238,18 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess } func (t *table) injectDispatchTableTask(task *dispatchTableTask) { - if t.id != task.TableID { + if !t.span.Eq(&task.Span) { log.Panic("schedulerv3: tableID not match", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), - zap.Int64("task.TableID", task.TableID)) + zap.Int64("tableID", t.span.TableID), + zap.Stringer("task.TableID", &task.Span)) } if t.task == nil { log.Info("schedulerv3: table found new task", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), + zap.Int64("tableID", t.span.TableID), zap.Any("task", task)) t.task = task return @@ -257,7 +258,7 @@ func (t *table) injectDispatchTableTask(task *dispatchTableTask) { "since there is one not finished yet", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), + zap.Int64("tableID", t.span.TableID), zap.Any("nowTask", t.task), zap.Any("ignoredTask", task)) } @@ -273,7 +274,7 @@ func (t *table) poll(ctx context.Context) (*schedulepb.Message, error) { } type tableManager struct { - tables map[model.TableID]*table + tables *spanz.Map[*table] executor internal.TableExecutor changefeedID model.ChangeFeedID @@ -283,7 +284,7 @@ func newTableManager( changefeed model.ChangeFeedID, executor internal.TableExecutor, ) *tableManager { return &tableManager{ - tables: make(map[model.TableID]*table), + tables: spanz.NewMap[*table](), executor: executor, changefeedID: changefeed, } @@ -291,54 +292,61 @@ func newTableManager( func (tm *tableManager) poll(ctx context.Context) ([]*schedulepb.Message, error) { result := make([]*schedulepb.Message, 0) - for tableID, table := range tm.tables { + var err error + toBeDropped := []tablepb.Span{} + tm.tables.Ascend(func(span tablepb.Span, table *table) bool { message, err := table.poll(ctx) if err != nil { - return result, errors.Trace(err) + err = errors.Trace(err) + return false } state, _ := table.getAndUpdateTableState() if state == tablepb.TableStateAbsent { - tm.dropTable(tableID) + toBeDropped = append(toBeDropped, span) } if message == nil { - continue + return true } result = append(result, message) + return true + }) + for _, span := range toBeDropped { + tm.dropTable(span) } - return result, nil + return result, err } -func (tm *tableManager) getAllTables() map[model.TableID]*table { +func (tm *tableManager) getAllTables() *spanz.Map[*table] { return tm.tables } // addTable add the target table, and return it. -func (tm *tableManager) addTable(tableID model.TableID) *table { - table, ok := tm.tables[tableID] +func (tm *tableManager) addTable(span tablepb.Span) *table { + table, ok := tm.tables.Get(span) if !ok { - table = newTable(tm.changefeedID, tableID, tm.executor) - tm.tables[tableID] = table + table = newTable(tm.changefeedID, span, tm.executor) + tm.tables.ReplaceOrInsert(span, table) } return table } -func (tm *tableManager) getTable(tableID model.TableID) (*table, bool) { - table, ok := tm.tables[tableID] +func (tm *tableManager) getTable(span tablepb.Span) (*table, bool) { + table, ok := tm.tables.Get(span) if ok { return table, true } return nil, false } -func (tm *tableManager) dropTable(tableID model.TableID) { - table, ok := tm.tables[tableID] +func (tm *tableManager) dropTable(span tablepb.Span) { + table, ok := tm.tables.Get(span) if !ok { log.Warn("schedulerv3: tableManager drop table not found", zap.String("namespace", tm.changefeedID.Namespace), zap.String("changefeed", tm.changefeedID.ID), - zap.Int64("tableID", tableID)) + zap.Stringer("span", &span)) return } state, _ := table.getAndUpdateTableState() @@ -346,25 +354,25 @@ func (tm *tableManager) dropTable(tableID model.TableID) { log.Panic("schedulerv3: tableManager drop table undesired", zap.String("namespace", tm.changefeedID.Namespace), zap.String("changefeed", tm.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Stringer("span", &span), zap.Stringer("state", table.state)) } log.Debug("schedulerv3: tableManager drop table", zap.String("namespace", tm.changefeedID.Namespace), zap.String("changefeed", tm.changefeedID.ID), - zap.Int64("tableID", tableID)) - delete(tm.tables, tableID) + zap.Stringer("span", &span)) + tm.tables.Delete(span) } -func (tm *tableManager) getTableStatus(tableID model.TableID) tablepb.TableStatus { - table, ok := tm.getTable(tableID) +func (tm *tableManager) getTableStatus(span tablepb.Span) tablepb.TableStatus { + table, ok := tm.getTable(span) if ok { return table.getTableStatus() } return tablepb.TableStatus{ - TableID: tableID, - State: tablepb.TableStateAbsent, + Span: span, + State: tablepb.TableStateAbsent, } } diff --git a/cdc/scheduler/internal/v3/agent/table_test.go b/cdc/scheduler/internal/v3/agent/table_test.go index 077ede1dcb5..98eb9ee62d7 100644 --- a/cdc/scheduler/internal/v3/agent/table_test.go +++ b/cdc/scheduler/internal/v3/agent/table_test.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/require" ) @@ -29,9 +30,10 @@ func TestTableManager(t *testing.T) { tableM := newTableManager(model.ChangeFeedID{}, mockTableExecutor) - tableM.addTable(model.TableID(1)) - require.Equal(t, tablepb.TableStateAbsent, tableM.tables[model.TableID(1)].state) + span1 := spanz.TableIDToComparableSpan(1) + tableM.addTable(span1) + require.Equal(t, tablepb.TableStateAbsent, tableM.tables.GetV(span1).state) - tableM.dropTable(model.TableID(1)) - require.NotContains(t, tableM.tables, model.TableID(1)) + tableM.dropTable(span1) + require.False(t, tableM.tables.Has(span1)) } diff --git a/cdc/scheduler/rexport.go b/cdc/scheduler/rexport.go index fe821e915f9..6a6f1b18f95 100644 --- a/cdc/scheduler/rexport.go +++ b/cdc/scheduler/rexport.go @@ -69,11 +69,11 @@ func NewAgent( etcdClient etcd.CDCEtcdClient, executor TableExecutor, changefeedID model.ChangeFeedID, + cfg *config.SchedulerConfig, ) (Agent, error) { return v3agent.NewAgent( ctx, captureID, liveness, changefeedID, - messageServer, messageRouter, etcdClient, executor, - ) + messageServer, messageRouter, etcdClient, executor, cfg) } // NewScheduler returns two-phase scheduler.