From 56d985b62453b9532101445b69e74f5307a10b26 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 2 Dec 2022 16:40:50 +0800 Subject: [PATCH 1/4] cdc: refactor TableExecutor interface and agent * Replace TableID with tablepb.Span in TableExecutor * Replace TableID with tablepb.Span in agent Signed-off-by: Neil Shen --- cdc/processor/manager.go | 2 +- cdc/processor/processor.go | 130 +++---- cdc/processor/processor_test.go | 73 ++-- cdc/scheduler/internal/table_executor.go | 12 +- cdc/scheduler/internal/v3/agent/agent.go | 95 +++-- .../internal/v3/agent/agent_bench_test.go | 10 +- cdc/scheduler/internal/v3/agent/agent_test.go | 328 ++++++++++++------ cdc/scheduler/internal/v3/agent/table.go | 108 +++--- cdc/scheduler/internal/v3/agent/table_test.go | 10 +- cdc/scheduler/rexport.go | 4 +- 10 files changed, 476 insertions(+), 296 deletions(-) diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index 4780e45120d..f6e62e1bc37 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -243,7 +243,7 @@ func (m *managerImpl) handleCommand(ctx cdcContext.Context) error { case commandTpQueryTableCount: count := 0 for _, p := range m.processors { - count += len(p.GetAllCurrentTables()) + count += p.GetAllCurrentTables() } select { case cmd.payload.(chan int) <- count: diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index be964ca54d6..47e503808fc 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -118,13 +118,15 @@ func (p *processor) checkReadyForMessages() bool { return p.changefeed != nil && p.changefeed.Status != nil } +var _ scheduler.TableExecutor = (*processor)(nil) + // AddTable implements TableExecutor interface. // AddTable may cause by the following scenario // 1. `Create Table`, a new table dispatched to the processor, `isPrepare` should be false // 2. Prepare phase for 2 phase scheduling, `isPrepare` should be true. // 3. Replicating phase for 2 phase scheduling, `isPrepare` should be false func (p *processor) AddTable( - ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool, + ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool, ) (bool, error) { if !p.checkReadyForMessages() { return false, nil @@ -135,7 +137,7 @@ func (p *processor) AddTable( zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("checkpointTs", startTs), zap.Bool("isPrepare", isPrepare)) } @@ -143,9 +145,9 @@ func (p *processor) AddTable( var alreadyExist bool var state tablepb.TableState if p.pullBasedSinking { - state, alreadyExist = p.sinkManager.GetTableState(tableID) + state, alreadyExist = p.sinkManager.GetTableState(span.TableID) } else { - table, ok := p.tables[tableID] + table, ok := p.tables[span.TableID] if ok { alreadyExist = true state = table.State() @@ -161,7 +163,7 @@ func (p *processor) AddTable( zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("checkpointTs", startTs), zap.Bool("isPrepare", isPrepare)) return true, nil @@ -170,11 +172,11 @@ func (p *processor) AddTable( // be stopped on original capture already, it's safe to start replicating data now. if !isPrepare { if p.pullBasedSinking { - if err := p.sinkManager.StartTable(tableID, startTs); err != nil { + if err := p.sinkManager.StartTable(span.TableID, startTs); err != nil { return false, errors.Trace(err) } } else { - p.tables[tableID].Start(startTs) + p.tables[span.TableID].Start(startTs) } } return true, nil @@ -183,7 +185,7 @@ func (p *processor) AddTable( zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("checkpointTs", startTs), zap.Bool("isPrepare", isPrepare)) return true, nil @@ -192,10 +194,10 @@ func (p *processor) AddTable( zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("checkpointTs", startTs), zap.Bool("isPrepare", isPrepare)) - p.removeTable(p.tables[tableID], tableID) + p.removeTable(p.tables[span.TableID], span.TableID) } } @@ -208,36 +210,37 @@ func (p *processor) AddTable( zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("checkpointTs", startTs), zap.Bool("isPrepare", isPrepare)) } if p.pullBasedSinking { - p.sourceManager.AddTable(ctx.(cdcContext.Context), tableID, p.getTableName(ctx, tableID), startTs) + p.sourceManager.AddTable( + ctx.(cdcContext.Context), span.TableID, p.getTableName(ctx, span.TableID), startTs) if p.redoManager.Enabled() { - p.redoManager.AddTable(tableID, startTs) + p.redoManager.AddTable(span.TableID, startTs) } - p.sinkManager.AddTable(tableID, startTs, p.changefeed.Info.TargetTs) + p.sinkManager.AddTable(span.TableID, startTs, p.changefeed.Info.TargetTs) if !isPrepare { - if err := p.sinkManager.StartTable(tableID, startTs); err != nil { + if err := p.sinkManager.StartTable(span.TableID, startTs); err != nil { return false, errors.Trace(err) } } } else { table, err := p.createTablePipeline( - ctx.(cdcContext.Context), tableID, &model.TableReplicaInfo{StartTs: startTs}) + ctx.(cdcContext.Context), span.TableID, &model.TableReplicaInfo{StartTs: startTs}) if err != nil { return false, errors.Trace(err) } - p.tables[tableID] = table + p.tables[span.TableID] = table if !isPrepare { table.Start(startTs) log.Debug("start table", zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("startTs", startTs)) } } @@ -246,31 +249,31 @@ func (p *processor) AddTable( } // RemoveTable implements TableExecutor interface. -func (p *processor) RemoveTable(tableID model.TableID) bool { +func (p *processor) RemoveTable(span tablepb.Span) bool { if !p.checkReadyForMessages() { return false } if p.pullBasedSinking { - _, exist := p.sinkManager.GetTableState(tableID) + _, exist := p.sinkManager.GetTableState(span.TableID) if !exist { log.Warn("Table which will be deleted is not found", zap.String("capture", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID)) + zap.Int64("tableID", span.TableID)) return true } - p.sinkManager.AsyncStopTable(tableID) + p.sinkManager.AsyncStopTable(span.TableID) return true } - table, ok := p.tables[tableID] + table, ok := p.tables[span.TableID] if !ok { log.Warn("table which will be deleted is not found", zap.String("capture", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID)) + zap.Int64("tableID", span.TableID)) return true } if !table.AsyncStop() { @@ -281,14 +284,14 @@ func (p *processor) RemoveTable(tableID model.TableID) bool { zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), zap.Uint64("checkpointTs", table.CheckpointTs()), - zap.Int64("tableID", tableID)) + zap.Int64("tableID", span.TableID)) return false } return true } // IsAddTableFinished implements TableExecutor interface. -func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bool { +func (p *processor) IsAddTableFinished(span tablepb.Span, isPrepare bool) bool { if !p.checkReadyForMessages() { return false } @@ -303,14 +306,14 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo done := func() bool { var alreadyExist bool if p.pullBasedSinking { - state, alreadyExist = p.sinkManager.GetTableState(tableID) + state, alreadyExist = p.sinkManager.GetTableState(span.TableID) if alreadyExist { - stats := p.sinkManager.GetTableStats(tableID) + stats := p.sinkManager.GetTableStats(span.TableID) tableResolvedTs = stats.ResolvedTs tableCheckpointTs = stats.CheckpointTs } } else { - table, ok := p.tables[tableID] + table, ok := p.tables[span.TableID] if ok { alreadyExist = true state = table.State() @@ -324,7 +327,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Bool("isPrepare", isPrepare)) } @@ -340,7 +343,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("tableResolvedTs", tableResolvedTs), zap.Uint64("localResolvedTs", localResolvedTs), zap.Uint64("globalResolvedTs", globalResolvedTs), @@ -356,7 +359,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("tableResolvedTs", tableResolvedTs), zap.Uint64("localResolvedTs", localResolvedTs), zap.Uint64("globalResolvedTs", globalResolvedTs), @@ -369,7 +372,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo } // IsRemoveTableFinished implements TableExecutor interface. -func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool) { +func (p *processor) IsRemoveTableFinished(span tablepb.Span) (model.Ts, bool) { if !p.checkReadyForMessages() { return 0, false } @@ -378,13 +381,13 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool var state tablepb.TableState var tableCheckpointTs uint64 if p.pullBasedSinking { - state, alreadyExist = p.sinkManager.GetTableState(tableID) + state, alreadyExist = p.sinkManager.GetTableState(span.TableID) if alreadyExist { - stats := p.sinkManager.GetTableStats(tableID) + stats := p.sinkManager.GetTableStats(span.TableID) tableCheckpointTs = stats.CheckpointTs } } else { - table, ok := p.tables[tableID] + table, ok := p.tables[span.TableID] if ok { alreadyExist = true state = table.State() @@ -397,7 +400,7 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID)) + zap.Int64("tableID", span.TableID)) return 0, true } @@ -407,53 +410,49 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), zap.Uint64("checkpointTs", tableCheckpointTs), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Any("tableStatus", state)) return 0, false } if p.pullBasedSinking { - stats := p.sinkManager.GetTableStats(tableID) - p.sourceManager.RemoveTable(tableID) - p.sinkManager.RemoveTable(tableID) + stats := p.sinkManager.GetTableStats(span.TableID) + p.sourceManager.RemoveTable(span.TableID) + p.sinkManager.RemoveTable(span.TableID) if p.redoManager.Enabled() { - p.redoManager.RemoveTable(tableID) + p.redoManager.RemoveTable(span.TableID) } log.Info("table removed", zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("checkpointTs", stats.CheckpointTs)) return stats.CheckpointTs, true } - table := p.tables[tableID] + table := p.tables[span.TableID] p.metricRemainKVEventGauge.Sub(float64(table.RemainEvents())) table.Cancel() table.Wait() - delete(p.tables, tableID) + delete(p.tables, span.TableID) checkpointTs := table.CheckpointTs() log.Info("table removed", zap.String("captureID", p.captureInfo.ID), zap.String("namespace", p.changefeedID.Namespace), zap.String("changefeed", p.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Int64("tableID", span.TableID), zap.Uint64("checkpointTs", checkpointTs)) return checkpointTs, true } // GetAllCurrentTables implements TableExecutor interface. -func (p *processor) GetAllCurrentTables() []model.TableID { +func (p *processor) GetAllCurrentTables() int { if p.pullBasedSinking { - return p.sinkManager.GetAllCurrentTableIDs() - } - ret := make([]model.TableID, 0, len(p.tables)) - for tableID := range p.tables { - ret = append(ret, tableID) + return len(p.sinkManager.GetAllCurrentTableIDs()) } - return ret + return len(p.tables) } // GetCheckpoint implements TableExecutor interface. @@ -462,35 +461,39 @@ func (p *processor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) { } // GetTableStatus implements TableExecutor interface -func (p *processor) GetTableStatus(tableID model.TableID) tablepb.TableStatus { +func (p *processor) GetTableStatus(span tablepb.Span) tablepb.TableStatus { if p.pullBasedSinking { - state, exist := p.sinkManager.GetTableState(tableID) + state, exist := p.sinkManager.GetTableState(span.TableID) if !exist { return tablepb.TableStatus{ - TableID: tableID, + TableID: span.TableID, + Span: span, State: tablepb.TableStateAbsent, } } - sinkStats := p.sinkManager.GetTableStats(tableID) + sinkStats := p.sinkManager.GetTableStats(span.TableID) return tablepb.TableStatus{ - TableID: tableID, + TableID: span.TableID, + Span: span, Checkpoint: tablepb.Checkpoint{ CheckpointTs: sinkStats.CheckpointTs, ResolvedTs: sinkStats.ResolvedTs, }, State: state, - Stats: p.getStatsFromSourceManagerAndSinkManager(tableID, sinkStats), + Stats: p.getStatsFromSourceManagerAndSinkManager(span.TableID, sinkStats), } } - table, ok := p.tables[tableID] + table, ok := p.tables[span.TableID] if !ok { return tablepb.TableStatus{ - TableID: tableID, + TableID: span.TableID, + Span: span, State: tablepb.TableStateAbsent, } } return tablepb.TableStatus{ - TableID: tableID, + TableID: span.TableID, + Span: span, Checkpoint: tablepb.Checkpoint{ CheckpointTs: table.CheckpointTs(), ResolvedTs: table.ResolvedTs(), @@ -924,9 +927,10 @@ func (p *processor) newAgentImpl( messageRouter := ctx.GlobalVars().MessageRouter etcdClient := ctx.GlobalVars().EtcdClient captureID := ctx.GlobalVars().CaptureInfo.ID + cfg := config.GetGlobalServerConfig().Debug.Scheduler ret, err = scheduler.NewAgent( ctx, captureID, liveness, - messageServer, messageRouter, etcdClient, p, p.changefeedID) + messageServer, messageRouter, etcdClient, p, p.changefeedID, cfg) return ret, errors.Trace(err) } diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 93f13ebb7d5..1c2ab2862f3 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -33,6 +33,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator" + "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/upstream" "github.com/stretchr/testify/require" ) @@ -248,7 +249,7 @@ type mockAgent struct { } func (a *mockAgent) Tick(_ context.Context) error { - if len(a.executor.GetAllCurrentTables()) == 0 { + if a.executor.GetAllCurrentTables() == 0 { return nil } a.lastCheckpointTs, _ = a.executor.GetCheckpoint() @@ -287,7 +288,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { tester.MustApplyPatches() // table-1: `preparing` -> `prepared` -> `replicating` - ok, err := p.AddTable(ctx, 1, 20, true) + ok, err := p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 20, true) require.NoError(t, err) require.True(t, ok) @@ -301,7 +302,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { checkpointTs := p.agent.GetLastSentCheckpointTs() require.Equal(t, checkpointTs, model.Ts(0)) - done := p.IsAddTableFinished(1, true) + done := p.IsAddTableFinished(spanz.TableIDToComparableSpan(1), true) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table1.State()) @@ -312,7 +313,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.Nil(t, err) tester.MustApplyPatches() - done = p.IsAddTableFinished(1, true) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(1), true) require.True(t, done) require.Equal(t, tablepb.TableStatePrepared, table1.State()) @@ -320,12 +321,12 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { checkpointTs = p.agent.GetLastSentCheckpointTs() require.Equal(t, checkpointTs, model.Ts(20)) - ok, err = p.AddTable(ctx, 1, 30, true) + ok, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 30, true) require.NoError(t, err) require.True(t, ok) require.Equal(t, model.Ts(0), table1.sinkStartTs) - ok, err = p.AddTable(ctx, 1, 30, false) + ok, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 30, false) require.NoError(t, err) require.True(t, ok) require.Equal(t, model.Ts(30), table1.sinkStartTs) @@ -336,7 +337,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.Nil(t, err) tester.MustApplyPatches() - done = p.IsAddTableFinished(1, false) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(1), false) require.True(t, done) require.Equal(t, tablepb.TableStateReplicating, table1.State()) @@ -370,32 +371,33 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { require.NoError(t, err) tester.MustApplyPatches() - ok, err := p.AddTable(ctx, 1, 20, false) + ok, err := p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 20, false) require.NoError(t, err) require.True(t, ok) table1 := p.tables[1].(*mockTablePipeline) require.Equal(t, model.Ts(20), table1.sinkStartTs) require.Equal(t, tablepb.TableStatePreparing, table1.state) - meta := p.GetTableStatus(model.TableID(1)) + meta := p.GetTableStatus(spanz.TableIDToComparableSpan(1)) require.Equal(t, model.TableID(1), meta.TableID) + require.Equal(t, spanz.TableIDToComparableSpan(1), meta.Span) require.Equal(t, tablepb.TableStatePreparing, meta.State) - ok, err = p.AddTable(ctx, 2, 20, false) + ok, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(2), 20, false) require.NoError(t, err) require.True(t, ok) table2 := p.tables[2].(*mockTablePipeline) require.Equal(t, model.Ts(20), table2.sinkStartTs) require.Equal(t, tablepb.TableStatePreparing, table2.state) - ok, err = p.AddTable(ctx, 3, 20, false) + ok, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(3), 20, false) require.NoError(t, err) require.True(t, ok) table3 := p.tables[3].(*mockTablePipeline) require.Equal(t, model.Ts(20), table3.sinkStartTs) require.Equal(t, tablepb.TableStatePreparing, table3.state) - ok, err = p.AddTable(ctx, 4, 20, false) + ok, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(4), 20, false) require.NoError(t, err) require.True(t, ok) table4 := p.tables[4].(*mockTablePipeline) @@ -407,16 +409,16 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { checkpointTs := p.agent.GetLastSentCheckpointTs() require.Equal(t, checkpointTs, model.Ts(0)) - done := p.IsAddTableFinished(1, false) + done := p.IsAddTableFinished(spanz.TableIDToComparableSpan(1), false) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table1.State()) - done = p.IsAddTableFinished(2, false) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(2), false) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table2.State()) - done = p.IsAddTableFinished(3, false) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(3), false) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table3.State()) - done = p.IsAddTableFinished(4, false) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(4), false) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table4.State()) require.Len(t, p.tables, 4) @@ -436,16 +438,16 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { table3.checkpointTs = 30 table4.checkpointTs = 30 - done = p.IsAddTableFinished(1, false) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(1), false) require.True(t, done) require.Equal(t, tablepb.TableStateReplicating, table1.State()) - done = p.IsAddTableFinished(2, false) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(2), false) require.True(t, done) require.Equal(t, tablepb.TableStateReplicating, table2.State()) - done = p.IsAddTableFinished(3, false) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(3), false) require.True(t, done) require.Equal(t, tablepb.TableStateReplicating, table3.State()) - done = p.IsAddTableFinished(4, false) + done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(4), false) require.True(t, done) require.Equal(t, tablepb.TableStateReplicating, table4.State()) @@ -471,7 +473,7 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { require.NoError(t, err) tester.MustApplyPatches() - ok = p.RemoveTable(3) + ok = p.RemoveTable(spanz.TableIDToComparableSpan(3)) require.True(t, ok) err = p.Tick(ctx) @@ -483,7 +485,7 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { require.False(t, table3.canceled) require.Equal(t, model.Ts(60), table3.CheckpointTs()) - checkpointTs, done = p.IsRemoveTableFinished(3) + checkpointTs, done = p.IsRemoveTableFinished(spanz.TableIDToComparableSpan(3)) require.False(t, done) require.Equal(t, model.Ts(0), checkpointTs) @@ -506,11 +508,12 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { require.Len(t, p.tables, 4) require.False(t, table3.canceled) - checkpointTs, done = p.IsRemoveTableFinished(3) + checkpointTs, done = p.IsRemoveTableFinished(spanz.TableIDToComparableSpan(3)) require.True(t, done) require.Equal(t, model.Ts(65), checkpointTs) - meta = p.GetTableStatus(model.TableID(3)) + meta = p.GetTableStatus(spanz.TableIDToComparableSpan(3)) require.Equal(t, model.TableID(3), meta.TableID) + require.Equal(t, spanz.TableIDToComparableSpan(3), meta.Span) require.Equal(t, tablepb.TableStateAbsent, meta.State) require.Len(t, p.tables, 3) @@ -602,10 +605,10 @@ func TestProcessorClose(t *testing.T) { tester.MustApplyPatches() // add tables - done, err := p.AddTable(ctx, model.TableID(1), 20, false) + done, err := p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 20, false) require.Nil(t, err) require.True(t, done) - done, err = p.AddTable(ctx, model.TableID(2), 30, false) + done, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(2), 30, false) require.Nil(t, err) require.True(t, done) @@ -642,10 +645,10 @@ func TestProcessorClose(t *testing.T) { tester.MustApplyPatches() // add tables - done, err = p.AddTable(ctx, model.TableID(1), 20, false) + done, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 20, false) require.Nil(t, err) require.True(t, done) - done, err = p.AddTable(ctx, model.TableID(2), 30, false) + done, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(2), 30, false) require.Nil(t, err) require.True(t, done) err = p.Tick(ctx) @@ -675,10 +678,10 @@ func TestPositionDeleted(t *testing.T) { p, tester := initProcessor4Test(ctx, t, &liveness) var err error // add table - done, err := p.AddTable(ctx, model.TableID(1), 30, false) + done, err := p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 30, false) require.Nil(t, err) require.True(t, done) - done, err = p.AddTable(ctx, model.TableID(2), 40, false) + done, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(2), 40, false) require.Nil(t, err) require.True(t, done) // init tick @@ -689,11 +692,11 @@ func TestPositionDeleted(t *testing.T) { table1 := p.tables[1].(*mockTablePipeline) table2 := p.tables[2].(*mockTablePipeline) - table1.resolvedTs += 1 - table2.resolvedTs += 1 + table1.resolvedTs++ + table2.resolvedTs++ - table1.checkpointTs += 1 - table2.checkpointTs += 1 + table1.checkpointTs++ + table2.checkpointTs++ // cal position err = p.Tick(ctx) @@ -795,7 +798,7 @@ func TestUpdateBarrierTs(t *testing.T) { }) p.schemaStorage.(*mockSchemaStorage).resolvedTs = 10 - done, err := p.AddTable(ctx, model.TableID(1), 5, false) + done, err := p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 5, false) require.True(t, done) require.Nil(t, err) err = p.Tick(ctx) diff --git a/cdc/scheduler/internal/table_executor.go b/cdc/scheduler/internal/table_executor.go index 5c7736f9ec3..680c82eb424 100644 --- a/cdc/scheduler/internal/table_executor.go +++ b/cdc/scheduler/internal/table_executor.go @@ -30,18 +30,18 @@ type TableExecutor interface { // if `isPrepare` is true, the 1st phase of the 2 phase scheduling protocol. // if `isPrepare` is false, the 2nd phase. AddTable( - ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool, + ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool, ) (done bool, err error) // IsAddTableFinished make sure the requested table is in the proper status - IsAddTableFinished(tableID model.TableID, isPrepare bool) (done bool) + IsAddTableFinished(span tablepb.Span, isPrepare bool) (done bool) // RemoveTable remove the table, return true if the table is already removed - RemoveTable(tableID model.TableID) (done bool) + RemoveTable(span tablepb.Span) (done bool) // IsRemoveTableFinished convince the table is fully stopped. // return false if table is not stopped // return true and corresponding checkpoint otherwise. - IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool) + IsRemoveTableFinished(span tablepb.Span) (model.Ts, bool) // GetAllCurrentTables should return all tables that are being run, // being added and being removed. @@ -49,7 +49,7 @@ type TableExecutor interface { // NOTE: two subsequent calls to the method should return the same // result, unless there is a call to AddTable, RemoveTable, IsAddTableFinished // or IsRemoveTableFinished in between two calls to this method. - GetAllCurrentTables() []model.TableID + GetAllCurrentTables() int // GetCheckpoint returns the local checkpoint-ts and resolved-ts of // the processor. Its calculation should take into consideration all @@ -58,5 +58,5 @@ type TableExecutor interface { GetCheckpoint() (checkpointTs, resolvedTs model.Ts) // GetTableStatus return the checkpoint and resolved ts for the given table - GetTableStatus(tableID model.TableID) tablepb.TableStatus + GetTableStatus(span tablepb.Span) tablepb.TableStatus } diff --git a/cdc/scheduler/internal/v3/agent/agent.go b/cdc/scheduler/internal/v3/agent/agent.go index db302864bb2..5e8be028ecd 100644 --- a/cdc/scheduler/internal/v3/agent/agent.go +++ b/cdc/scheduler/internal/v3/agent/agent.go @@ -23,8 +23,10 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/scheduler/internal" + "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/compat" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/transport" "github.com/pingcap/tiflow/cdc/scheduler/schedulepb" + "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/p2p" @@ -37,7 +39,8 @@ var _ internal.Agent = (*agent)(nil) type agent struct { agentInfo - trans transport.Transport + trans transport.Transport + compat *compat.Compat tableM *tableManager @@ -74,9 +77,8 @@ func newAgentInfo(changefeedID model.ChangeFeedID, captureID model.CaptureID) ag } type ownerInfo struct { - Revision schedulepb.OwnerRevision - Version string - CaptureID string + model.CaptureInfo + Revision schedulepb.OwnerRevision } func newAgent( @@ -86,11 +88,13 @@ func newAgent( changeFeedID model.ChangeFeedID, etcdClient etcd.CDCEtcdClient, tableExecutor internal.TableExecutor, + cfg *config.SchedulerConfig, ) (internal.Agent, error) { result := &agent{ agentInfo: newAgentInfo(changeFeedID, captureID), tableM: newTableManager(changeFeedID, tableExecutor), liveness: liveness, + compat: compat.New(cfg, map[model.CaptureID]*model.CaptureInfo{}), } etcdCliCtx, cancel := context.WithTimeout(ctx, 5*time.Second) @@ -111,6 +115,25 @@ func newAgent( zap.Error(err)) return result, nil } + var ownerCaptureInfo *model.CaptureInfo + _, captures, err := etcdClient.GetCaptures(ctx) + for _, captureInfo := range captures { + if captureInfo.ID == ownerCaptureID { + ownerCaptureInfo = captureInfo + break + } + } + if ownerCaptureInfo == nil { + log.Info("schedulerv3: no owner found. We will wait for an owner to contact us.", + zap.String("ownerCaptureID", ownerCaptureID), + zap.String("namespace", changeFeedID.Namespace), + zap.String("changefeed", changeFeedID.ID), + zap.Error(err)) + return result, nil + } + result.compat.UpdateCaptureInfo(map[model.CaptureID]*model.CaptureInfo{ + ownerCaptureID: ownerCaptureInfo, + }) log.Info("schedulerv3: agent owner found", zap.String("ownerCaptureID", ownerCaptureID), @@ -133,11 +156,12 @@ func newAgent( return nil, err } + // We don't need address, and owner info will be updated when there is a + // new owner elected. To avoid confusion, just leave it empty. + ownerCaptureInfo.AdvertiseAddr = "" result.ownerInfo = ownerInfo{ - // owner's version can only be got by receiving heartbeat - Version: "", - CaptureID: ownerCaptureID, - Revision: schedulepb.OwnerRevision{Revision: revision}, + Revision: schedulepb.OwnerRevision{Revision: revision}, + CaptureInfo: *ownerCaptureInfo, } return result, nil } @@ -151,9 +175,10 @@ func NewAgent(ctx context.Context, messageRouter p2p.MessageRouter, etcdClient etcd.CDCEtcdClient, tableExecutor internal.TableExecutor, + cfg *config.SchedulerConfig, ) (internal.Agent, error) { result, err := newAgent( - ctx, captureID, liveness, changeFeedID, etcdClient, tableExecutor) + ctx, captureID, liveness, changeFeedID, etcdClient, tableExecutor, cfg) if err != nil { return nil, errors.Trace(err) } @@ -235,17 +260,18 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) []*schedulepb.Message { func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) *schedulepb.Message { allTables := a.tableM.getAllTables() - result := make([]tablepb.TableStatus, 0, len(allTables)) - for _, table := range allTables { + result := make([]tablepb.TableStatus, 0, allTables.Len()) + allTables.Ascend(func(span tablepb.Span, table *table) bool { status := table.getTableStatus() if table.task != nil && table.task.IsRemove { status.State = tablepb.TableStateStopping } result = append(result, status) - } - for _, tableID := range request.GetTableIDs() { - if _, ok := allTables[tableID]; !ok { - status := a.tableM.getTableStatus(tableID) + return true + }) + for _, span := range request.GetSpans() { + if _, ok := allTables.Get(span); !ok { + status := a.tableM.getTableStatus(span) result = append(result, status) } } @@ -280,7 +306,7 @@ const ( ) type dispatchTableTask struct { - TableID model.TableID + Span tablepb.Span StartTs model.Ts IsRemove bool IsPrepare bool @@ -311,31 +337,31 @@ func (a *agent) handleMessageDispatchTableRequest( // this should be guaranteed by the caller of the method. switch req := request.Request.(type) { case *schedulepb.DispatchTableRequest_AddTable: - tableID := req.AddTable.GetTableID() + span := req.AddTable.GetSpan() task = &dispatchTableTask{ - TableID: tableID, + Span: span, StartTs: req.AddTable.GetCheckpoint().CheckpointTs, IsRemove: false, IsPrepare: req.AddTable.GetIsSecondary(), Epoch: epoch, status: dispatchTableTaskReceived, } - table = a.tableM.addTable(tableID) + table = a.tableM.addTable(span) case *schedulepb.DispatchTableRequest_RemoveTable: - tableID := req.RemoveTable.GetTableID() - table, ok = a.tableM.getTable(tableID) + span := req.RemoveTable.GetSpan() + table, ok = a.tableM.getTable(span) if !ok { log.Warn("schedulerv3: agent ignore remove table request, "+ "since the table not found", zap.String("capture", a.CaptureID), zap.String("namespace", a.ChangeFeedID.Namespace), zap.String("changefeed", a.ChangeFeedID.ID), - zap.Any("tableID", tableID), + zap.Stringer("span", &span), zap.Any("request", request)) return } task = &dispatchTableTask{ - TableID: tableID, + Span: span, IsRemove: true, Epoch: epoch, status: dispatchTableTaskReceived, @@ -373,7 +399,7 @@ func (a *agent) Close() error { // version: the incoming owner's semantic version string func (a *agent) handleOwnerInfo(id model.CaptureID, revision int64, version string) bool { if a.ownerInfo.Revision.Revision == revision { - if a.ownerInfo.CaptureID != id { + if a.ownerInfo.ID != id { // This panic will happen only if two messages have been received // with the same ownerRev but with different ownerIDs. // This should never happen unless the election via Etcd is buggy. @@ -381,7 +407,7 @@ func (a *agent) handleOwnerInfo(id model.CaptureID, revision int64, version stri zap.String("capture", a.CaptureID), zap.String("namespace", a.ChangeFeedID.Namespace), zap.String("changefeed", a.ChangeFeedID.ID), - zap.String("expected", a.ownerInfo.CaptureID), + zap.String("expected", a.ownerInfo.ID), zap.String("actual", id)) } return true @@ -389,12 +415,15 @@ func (a *agent) handleOwnerInfo(id model.CaptureID, revision int64, version stri // the current owner is staled if a.ownerInfo.Revision.Revision < revision { - a.ownerInfo.CaptureID = id + a.ownerInfo.CaptureInfo.ID = id + a.ownerInfo.CaptureInfo.Version = version a.ownerInfo.Revision.Revision = revision - a.ownerInfo.Version = version a.resetEpoch() + a.compat.UpdateCaptureInfo(map[model.CaptureID]*model.CaptureInfo{ + id: &a.ownerInfo.CaptureInfo, + }) log.Info("schedulerv3: new owner in power", zap.String("capture", a.CaptureID), zap.String("namespace", a.ChangeFeedID.Namespace), @@ -409,9 +438,11 @@ func (a *agent) handleOwnerInfo(id model.CaptureID, revision int64, version stri zap.String("namespace", a.ChangeFeedID.Namespace), zap.String("changefeed", a.ChangeFeedID.ID), zap.Any("staledOwner", ownerInfo{ - CaptureID: id, - Revision: schedulepb.OwnerRevision{Revision: revision}, - Version: version, + CaptureInfo: model.CaptureInfo{ + ID: id, + Version: version, + }, + Revision: schedulepb.OwnerRevision{Revision: revision}, }), zap.Any("owner", a.ownerInfo), zap.Any("agent", a.agentInfo)) @@ -433,6 +464,7 @@ func (a *agent) recvMsgs(ctx context.Context) ([]*schedulepb.Message, error) { messages[n] = val n++ } + a.compat.AfterTransportReceive(messages[:n]) return messages[:n], nil } @@ -452,7 +484,8 @@ func (a *agent) sendMsgs(ctx context.Context, msgs []*schedulepb.Message) error ProcessorEpoch: a.Epoch, } m.From = a.CaptureID - m.To = a.ownerInfo.CaptureID + m.To = a.ownerInfo.ID } + a.compat.BeforeTransportSend(msgs) return a.trans.Send(ctx, msgs) } diff --git a/cdc/scheduler/internal/v3/agent/agent_bench_test.go b/cdc/scheduler/internal/v3/agent/agent_bench_test.go index e82325a4db8..d6020fdb551 100644 --- a/cdc/scheduler/internal/v3/agent/agent_bench_test.go +++ b/cdc/scheduler/internal/v3/agent/agent_bench_test.go @@ -19,18 +19,22 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/scheduler/schedulepb" + "github.com/pingcap/tiflow/pkg/spanz" ) func benchmarkHeartbeatResponse(b *testing.B, bench func(b *testing.B, a *agent)) { upperBound := 16384 for size := 1; size <= upperBound; size *= 2 { tableExec := newMockTableExecutor() + liveness := model.LivenessCaptureAlive a := &agent{ - tableM: newTableManager(model.ChangeFeedID{}, tableExec), + tableM: newTableManager(model.ChangeFeedID{}, tableExec), + liveness: &liveness, } for j := 0; j < size; j++ { - _ = a.tableM.addTable(model.TableID(10000 + j)) + span := spanz.TableIDToComparableSpan(model.TableID(10000 + j)) + _ = a.tableM.addTable(span) } b.ResetTimer() @@ -41,7 +45,7 @@ func benchmarkHeartbeatResponse(b *testing.B, bench func(b *testing.B, a *agent) func BenchmarkRefreshAllTables(b *testing.B) { benchmarkHeartbeatResponse(b, func(b *testing.B, a *agent) { - total := len(a.tableM.tables) + total := a.tableM.tables.Len() b.Run(fmt.Sprintf("%d tables", total), func(b *testing.B) { for i := 0; i < b.N; i++ { a.handleMessageHeartbeat(&schedulepb.Heartbeat{}) diff --git a/cdc/scheduler/internal/v3/agent/agent_test.go b/cdc/scheduler/internal/v3/agent/agent_test.go index 4727db6e27f..15930f498fc 100644 --- a/cdc/scheduler/internal/v3/agent/agent_test.go +++ b/cdc/scheduler/internal/v3/agent/agent_test.go @@ -22,10 +22,14 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/cdc/scheduler/internal" + "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/compat" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/transport" "github.com/pingcap/tiflow/cdc/scheduler/schedulepb" + "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" mock_etcd "github.com/pingcap/tiflow/pkg/etcd/mock" + "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.etcd.io/etcd/client/v3/concurrency" @@ -60,10 +64,14 @@ func iterPermutation(sequence []int, fn func(sequence []int)) { func newAgent4Test() *agent { a := &agent{ ownerInfo: ownerInfo{ - Version: "owner-version-1", - CaptureID: "owner-1", - Revision: schedulepb.OwnerRevision{Revision: 1}, + CaptureInfo: model.CaptureInfo{ + Version: "owner-version-1", + ID: "owner-1", + }, + Revision: schedulepb.OwnerRevision{Revision: 1}, }, + compat: compat.New( + config.GetDefaultServerConfig().Debug.Scheduler, map[string]*model.CaptureInfo{}), } a.Version = "agent-version-1" @@ -82,12 +90,15 @@ func TestNewAgent(t *testing.T) { me := mock_etcd.NewMockCDCEtcdClient(gomock.NewController(t)) tableExector := newMockTableExecutor() + cfg := &config.SchedulerConfig{RegionPerSpan: 1} // owner and revision found successfully - me.EXPECT().GetOwnerID(gomock.Any()).Return("owneID", nil).Times(1) + me.EXPECT().GetOwnerID(gomock.Any()).Return("ownerID", nil).Times(1) + me.EXPECT().GetCaptures( + gomock.Any()).Return(int64(0), []*model.CaptureInfo{{ID: "ownerID"}}, nil).Times(1) me.EXPECT().GetOwnerRevision(gomock.Any(), gomock.Any()).Return(int64(2333), nil).Times(1) a, err := newAgent( - context.Background(), "capture-test", &liveness, changefeed, me, tableExector) + context.Background(), "capture-test", &liveness, changefeed, me, tableExector, cfg) require.NoError(t, err) require.NotNil(t, a) @@ -95,31 +106,35 @@ func TestNewAgent(t *testing.T) { me.EXPECT().GetOwnerID(gomock.Any()). Return("", concurrency.ErrElectionNoLeader).Times(1) a, err = newAgent( - context.Background(), "capture-test", &liveness, changefeed, me, tableExector) + context.Background(), "capture-test", &liveness, changefeed, me, tableExector, cfg) require.NoError(t, err) require.NotNil(t, a) // owner not found since pd is unstable me.EXPECT().GetOwnerID(gomock.Any()).Return("", cerror.ErrPDEtcdAPIError).Times(1) a, err = newAgent( - context.Background(), "capture-test", &liveness, changefeed, me, tableExector) + context.Background(), "capture-test", &liveness, changefeed, me, tableExector, cfg) require.Error(t, err) require.Nil(t, a) // owner found, get revision failed. me.EXPECT().GetOwnerID(gomock.Any()).Return("ownerID", nil).Times(1) + me.EXPECT().GetCaptures( + gomock.Any()).Return(int64(0), []*model.CaptureInfo{{ID: "ownerID"}}, nil).Times(1) me.EXPECT().GetOwnerRevision(gomock.Any(), gomock.Any()). Return(int64(0), cerror.ErrPDEtcdAPIError).Times(1) a, err = newAgent( - context.Background(), "capture-test", &liveness, changefeed, me, tableExector) + context.Background(), "capture-test", &liveness, changefeed, me, tableExector, cfg) require.Error(t, err) require.Nil(t, a) me.EXPECT().GetOwnerID(gomock.Any()).Return("ownerID", nil).Times(1) + me.EXPECT().GetCaptures( + gomock.Any()).Return(int64(0), []*model.CaptureInfo{{ID: "ownerID"}}, nil).Times(1) me.EXPECT().GetOwnerRevision(gomock.Any(), gomock.Any()). Return(int64(0), cerror.ErrOwnerNotFound).Times(1) a, err = newAgent( - context.Background(), "capture-test", &liveness, changefeed, me, tableExector) + context.Background(), "capture-test", &liveness, changefeed, me, tableExector, cfg) require.NoError(t, err) require.NotNil(t, a) } @@ -134,7 +149,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { removeTableRequest := &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_RemoveTable{ RemoveTable: &schedulepb.RemoveTableRequest{ - TableID: 1, + Span: spanz.TableIDToComparableSpan(1), }, }, } @@ -150,7 +165,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { addTableRequest := &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_AddTable{ AddTable: &schedulepb.AddTableRequest{ - TableID: 1, + Span: spanz.TableIDToComparableSpan(1), IsSecondary: true, }, }, @@ -169,9 +184,9 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { addTableResponse, ok := responses[0].DispatchTableResponse. Response.(*schedulepb.DispatchTableResponse_AddTable) require.True(t, ok) - require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.TableID) + require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.Span.TableID) require.Equal(t, tablepb.TableStateAbsent, addTableResponse.AddTable.Status.State) - require.NotContains(t, a.tableM.tables, model.TableID(1)) + require.False(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) // Force set liveness to alive. *a.liveness = model.LivenessCaptureAlive @@ -196,9 +211,9 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { addTableResponse, ok = responses[0].DispatchTableResponse. Response.(*schedulepb.DispatchTableResponse_AddTable) require.True(t, ok) - require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.TableID) + require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.Span.TableID) require.Equal(t, tablepb.TableStatePrepared, addTableResponse.AddTable.Status.State) - require.Contains(t, a.tableM.tables, model.TableID(1)) + require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) // let the prepared table become replicating, by set `IsSecondary` to false. addTableRequest.Request.(*schedulepb.DispatchTableRequest_AddTable). @@ -217,9 +232,9 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { addTableResponse, ok = responses[0].DispatchTableResponse. Response.(*schedulepb.DispatchTableResponse_AddTable) require.True(t, ok) - require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.TableID) + require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.Span.TableID) require.Equal(t, tablepb.TableStatePrepared, addTableResponse.AddTable.Status.State) - require.Contains(t, a.tableM.tables, model.TableID(1)) + require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) mockTableExecutor.ExpectedCalls = nil mockTableExecutor.On("IsAddTableFinished", mock.Anything, @@ -232,9 +247,9 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { addTableResponse, ok = responses[0].DispatchTableResponse. Response.(*schedulepb.DispatchTableResponse_AddTable) require.True(t, ok) - require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.TableID) + require.Equal(t, model.TableID(1), addTableResponse.AddTable.Status.Span.TableID) require.Equal(t, tablepb.TableStateReplicating, addTableResponse.AddTable.Status.State) - require.Contains(t, a.tableM.tables, model.TableID(1)) + require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) mockTableExecutor.On("RemoveTable", mock.Anything, mock.Anything). Return(false) @@ -246,9 +261,9 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { removeTableResponse, ok := responses[0].DispatchTableResponse. Response.(*schedulepb.DispatchTableResponse_RemoveTable) require.True(t, ok) - require.Equal(t, model.TableID(1), removeTableResponse.RemoveTable.Status.TableID) + require.Equal(t, model.TableID(1), removeTableResponse.RemoveTable.Status.Span.TableID) require.Equal(t, tablepb.TableStateStopping, removeTableResponse.RemoveTable.Status.State) - require.Contains(t, a.tableM.tables, model.TableID(1)) + require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) mockTableExecutor.ExpectedCalls = nil mockTableExecutor.On("RemoveTable", mock.Anything, mock.Anything). @@ -263,7 +278,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { removeTableResponse, ok = responses[0].DispatchTableResponse. Response.(*schedulepb.DispatchTableResponse_RemoveTable) require.True(t, ok) - require.Equal(t, model.TableID(1), removeTableResponse.RemoveTable.Status.TableID) + require.Equal(t, model.TableID(1), removeTableResponse.RemoveTable.Status.Span.TableID) require.Equal(t, tablepb.TableStateStopping, removeTableResponse.RemoveTable.Status.State) mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1] @@ -277,10 +292,10 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { removeTableResponse, ok = responses[0].DispatchTableResponse. Response.(*schedulepb.DispatchTableResponse_RemoveTable) require.True(t, ok) - require.Equal(t, model.TableID(1), removeTableResponse.RemoveTable.Status.TableID) + require.Equal(t, model.TableID(1), removeTableResponse.RemoveTable.Status.Span.TableID) require.Equal(t, tablepb.TableStateStopped, removeTableResponse.RemoveTable.Status.State) require.Equal(t, model.Ts(3), removeTableResponse.RemoveTable.Checkpoint.CheckpointTs) - require.NotContains(t, a.tableM.tables, model.TableID(1)) + require.False(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) } func TestAgentHandleMessageHeartbeat(t *testing.T) { @@ -291,20 +306,25 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { a.tableM = newTableManager(model.ChangeFeedID{}, mockTableExecutor) for i := 0; i < 5; i++ { - a.tableM.addTable(model.TableID(i)) + a.tableM.addTable(spanz.TableIDToComparableSpan(int64(i))) } - a.tableM.tables[model.TableID(0)].state = tablepb.TableStatePreparing - a.tableM.tables[model.TableID(1)].state = tablepb.TableStatePrepared - a.tableM.tables[model.TableID(2)].state = tablepb.TableStateReplicating - a.tableM.tables[model.TableID(3)].state = tablepb.TableStateStopping - a.tableM.tables[model.TableID(4)].state = tablepb.TableStateStopped - - mockTableExecutor.tables[model.TableID(0)] = tablepb.TableStatePreparing - mockTableExecutor.tables[model.TableID(1)] = tablepb.TableStatePrepared - mockTableExecutor.tables[model.TableID(2)] = tablepb.TableStateReplicating - mockTableExecutor.tables[model.TableID(3)] = tablepb.TableStateStopping - mockTableExecutor.tables[model.TableID(4)] = tablepb.TableStateStopped + a.tableM.tables.GetV(spanz.TableIDToComparableSpan(0)).state = tablepb.TableStatePreparing + a.tableM.tables.GetV(spanz.TableIDToComparableSpan(1)).state = tablepb.TableStatePrepared + a.tableM.tables.GetV(spanz.TableIDToComparableSpan(2)).state = tablepb.TableStateReplicating + a.tableM.tables.GetV(spanz.TableIDToComparableSpan(3)).state = tablepb.TableStateStopping + a.tableM.tables.GetV(spanz.TableIDToComparableSpan(4)).state = tablepb.TableStateStopped + + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(0), tablepb.TableStatePreparing) + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(1), tablepb.TableStatePrepared) + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(2), tablepb.TableStateReplicating) + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(3), tablepb.TableStateStopping) + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(4), tablepb.TableStateStopped) heartbeat := &schedulepb.Message{ Header: &schedulepb.Message_Header{ @@ -314,7 +334,18 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { MsgType: schedulepb.MsgHeartbeat, From: "owner-1", Heartbeat: &schedulepb.Heartbeat{ - TableIDs: []model.TableID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + Spans: []tablepb.Span{ + spanz.TableIDToComparableSpan(0), + spanz.TableIDToComparableSpan(1), + spanz.TableIDToComparableSpan(2), + spanz.TableIDToComparableSpan(3), + spanz.TableIDToComparableSpan(4), + spanz.TableIDToComparableSpan(5), + spanz.TableIDToComparableSpan(6), + spanz.TableIDToComparableSpan(7), + spanz.TableIDToComparableSpan(8), + spanz.TableIDToComparableSpan(9), + }, }, } @@ -325,7 +356,7 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { result := response[0].GetHeartbeatResponse().Tables require.Len(t, result, 10) sort.Slice(result, func(i, j int) bool { - return result[i].TableID < result[j].TableID + return result[i].Span.Less(&result[j].Span) }) require.Equal(t, tablepb.TableStatePreparing, result[0].State) @@ -337,11 +368,11 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { require.Equal(t, tablepb.TableStateAbsent, result[i].State) } - a.tableM.tables[model.TableID(1)].task = &dispatchTableTask{IsRemove: true} + a.tableM.tables.GetV(spanz.TableIDToComparableSpan(1)).task = &dispatchTableTask{IsRemove: true} response = a.handleMessage([]*schedulepb.Message{heartbeat}) result = response[0].GetHeartbeatResponse().Tables sort.Slice(result, func(i, j int) bool { - return result[i].TableID < result[j].TableID + return result[i].Span.TableID < result[j].Span.TableID }) require.Equal(t, tablepb.TableStateStopping, result[1].State) @@ -376,12 +407,12 @@ func TestAgentPermuteMessages(t *testing.T) { ProcessorEpoch: a.Epoch, }, MsgType: schedulepb.MsgDispatchTableRequest, - From: a.ownerInfo.CaptureID, + From: a.ownerInfo.ID, To: a.CaptureID, DispatchTableRequest: &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_RemoveTable{ RemoveTable: &schedulepb.RemoveTableRequest{ - TableID: 1, + Span: spanz.TableIDToComparableSpan(1), }, }, }, @@ -394,12 +425,12 @@ func TestAgentPermuteMessages(t *testing.T) { ProcessorEpoch: a.Epoch, }, MsgType: schedulepb.MsgDispatchTableRequest, - From: a.ownerInfo.CaptureID, + From: a.ownerInfo.ID, To: a.CaptureID, DispatchTableRequest: &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_AddTable{ AddTable: &schedulepb.AddTableRequest{ - TableID: 1, + Span: spanz.TableIDToComparableSpan(1), IsSecondary: isSecondary, }, }, @@ -416,7 +447,7 @@ func TestAgentPermuteMessages(t *testing.T) { MsgType: schedulepb.MsgHeartbeat, From: "owner-1", Heartbeat: &schedulepb.Heartbeat{ - TableIDs: []model.TableID{1}, + Spans: []tablepb.Span{{TableID: 1}}, }, }) @@ -435,15 +466,20 @@ func TestAgentPermuteMessages(t *testing.T) { t.Logf("test %v, %v", state, sequence) switch state { case tablepb.TableStatePreparing: - mockTableExecutor.tables[tableID] = tablepb.TableStatePreparing + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(tableID), tablepb.TableStatePreparing) case tablepb.TableStatePrepared: - mockTableExecutor.tables[tableID] = tablepb.TableStatePrepared + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(tableID), tablepb.TableStatePrepared) case tablepb.TableStateReplicating: - mockTableExecutor.tables[tableID] = tablepb.TableStateReplicating + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(tableID), tablepb.TableStateReplicating) case tablepb.TableStateStopping: - mockTableExecutor.tables[tableID] = tablepb.TableStateStopping + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(tableID), tablepb.TableStateStopping) case tablepb.TableStateStopped: - mockTableExecutor.tables[tableID] = tablepb.TableStateStopped + mockTableExecutor.tables.ReplaceOrInsert( + spanz.TableIDToComparableSpan(tableID), tablepb.TableStateStopped) case tablepb.TableStateAbsent: default: } @@ -529,7 +565,7 @@ func TestAgentHandleMessage(t *testing.T) { OwnerRevision: a.ownerInfo.Revision, }, MsgType: schedulepb.MsgHeartbeat, - From: a.ownerInfo.CaptureID, + From: a.ownerInfo.ID, Heartbeat: &schedulepb.Heartbeat{}, } @@ -545,11 +581,11 @@ func TestAgentHandleMessage(t *testing.T) { ProcessorEpoch: schedulepb.ProcessorEpoch{Epoch: "wrong-agent-epoch-1"}, }, MsgType: schedulepb.MsgDispatchTableRequest, - From: a.ownerInfo.CaptureID, + From: a.ownerInfo.ID, DispatchTableRequest: &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_AddTable{ AddTable: &schedulepb.AddTableRequest{ - TableID: 1, + Span: spanz.TableIDToComparableSpan(1), IsSecondary: true, Checkpoint: tablepb.Checkpoint{}, }, @@ -558,13 +594,13 @@ func TestAgentHandleMessage(t *testing.T) { } // wrong epoch, ignored responses := a.handleMessage([]*schedulepb.Message{addTableRequest}) - require.NotContains(t, tableM.tables, model.TableID(1)) + require.False(t, tableM.tables.Has(spanz.TableIDToComparableSpan(1))) require.Len(t, responses, 0) // correct epoch, processing. addTableRequest.Header.ProcessorEpoch = a.Epoch _ = a.handleMessage([]*schedulepb.Message{addTableRequest}) - require.Contains(t, tableM.tables, model.TableID(1)) + require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) heartbeat.Header.OwnerRevision.Revision = 2 response = a.handleMessage([]*schedulepb.Message{heartbeat}) @@ -578,7 +614,7 @@ func TestAgentHandleMessage(t *testing.T) { ProcessorEpoch: a.Epoch, }, MsgType: schedulepb.MsgUnknown, - From: a.ownerInfo.CaptureID, + From: a.ownerInfo.ID, } response = a.handleMessage([]*schedulepb.Message{unknownMessage}) @@ -623,8 +659,8 @@ func TestAgentTick(t *testing.T) { ProcessorEpoch: schedulepb.ProcessorEpoch{}, }, MsgType: schedulepb.MsgHeartbeat, - From: a.ownerInfo.CaptureID, - Heartbeat: &schedulepb.Heartbeat{TableIDs: nil}, + From: a.ownerInfo.ID, + Heartbeat: &schedulepb.Heartbeat{Spans: nil}, } // receive first heartbeat from the owner @@ -637,7 +673,7 @@ func TestAgentTick(t *testing.T) { trans.SendBuffer = trans.SendBuffer[:0] require.Equal(t, schedulepb.MsgHeartbeatResponse, heartbeatResponse.MsgType) - require.Equal(t, a.ownerInfo.CaptureID, heartbeatResponse.To) + require.Equal(t, a.ownerInfo.ID, heartbeatResponse.To) require.Equal(t, a.CaptureID, heartbeatResponse.From) addTableRequest := &schedulepb.Message{ @@ -647,11 +683,11 @@ func TestAgentTick(t *testing.T) { ProcessorEpoch: a.Epoch, }, MsgType: schedulepb.MsgDispatchTableRequest, - From: a.ownerInfo.CaptureID, + From: a.ownerInfo.ID, DispatchTableRequest: &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_AddTable{ AddTable: &schedulepb.AddTableRequest{ - TableID: 1, + Span: spanz.TableIDToComparableSpan(1), IsSecondary: true, Checkpoint: tablepb.Checkpoint{}, }, @@ -666,11 +702,11 @@ func TestAgentTick(t *testing.T) { ProcessorEpoch: a.Epoch, }, MsgType: schedulepb.MsgDispatchTableRequest, - From: a.ownerInfo.CaptureID, + From: a.ownerInfo.ID, DispatchTableRequest: &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_RemoveTable{ RemoveTable: &schedulepb.RemoveTableRequest{ - TableID: 2, + Span: tablepb.Span{TableID: 2}, }, }, }, @@ -721,7 +757,7 @@ func TestAgentHandleLivenessUpdate(t *testing.T) { ProcessorEpoch: a.Epoch, }, MsgType: schedulepb.MsgHeartbeat, - From: a.ownerInfo.CaptureID, + From: a.ownerInfo.ID, Heartbeat: &schedulepb.Heartbeat{ IsStopping: true, }, @@ -753,7 +789,7 @@ func TestAgentCommitAddTableDuringStopping(t *testing.T) { DispatchTableRequest: &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_AddTable{ AddTable: &schedulepb.AddTableRequest{ - TableID: 1, + Span: spanz.TableIDToComparableSpan(1), IsSecondary: true, }, }, @@ -796,7 +832,7 @@ func TestAgentCommitAddTableDuringStopping(t *testing.T) { DispatchTableRequest: &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_AddTable{ AddTable: &schedulepb.AddTableRequest{ - TableID: 1, + Span: spanz.TableIDToComparableSpan(1), IsSecondary: false, }, }, @@ -829,84 +865,173 @@ func TestAgentCommitAddTableDuringStopping(t *testing.T) { require.Equal(t, tablepb.TableStateReplicating, addTableResp.Status.State) } +func TestAgentTransportCompat(t *testing.T) { + t.Parallel() + + a := newAgent4Test() + mockTableExecutor := newMockTableExecutor() + a.tableM = newTableManager(model.ChangeFeedID{}, mockTableExecutor) + trans := transport.NewMockTrans() + a.trans = trans + a.compat = compat.New(&config.SchedulerConfig{ + RegionPerSpan: 1, + }, map[model.CaptureID]*model.CaptureInfo{}) + ctx := context.Background() + + // Disable span replication. + a.handleOwnerInfo("a", a.ownerInfo.Revision.Revision+1, "4.0.0") + require.False(t, a.compat.CheckSpanReplicationEnabled()) + + // Test compat.BeforeTransportSend. + a.sendMsgs( + ctx, []*schedulepb.Message{{ + Header: &schedulepb.Message_Header{ + Version: a.Version, + ProcessorEpoch: a.Epoch, + }, + From: a.CaptureID, To: "a", MsgType: schedulepb.MsgDispatchTableResponse, + DispatchTableResponse: &schedulepb.DispatchTableResponse{ + Response: &schedulepb.DispatchTableResponse_AddTable{ + AddTable: &schedulepb.AddTableResponse{ + Status: &tablepb.TableStatus{ + Span: spanz.TableIDToComparableSpan(1), + }, + }, + }, + }, + }}) + require.EqualValues(t, []*schedulepb.Message{{ + Header: &schedulepb.Message_Header{ + Version: a.Version, + ProcessorEpoch: a.Epoch, + OwnerRevision: a.ownerInfo.Revision, + }, + From: a.CaptureID, To: "a", MsgType: schedulepb.MsgDispatchTableResponse, + DispatchTableResponse: &schedulepb.DispatchTableResponse{ + Response: &schedulepb.DispatchTableResponse_AddTable{ + AddTable: &schedulepb.AddTableResponse{ + Status: &tablepb.TableStatus{ + TableID: 1, + Span: spanz.TableIDToComparableSpan(1), + }, + }, + }, + }, + }}, trans.SendBuffer) + // Test compat.AfterTransportReceive. + trans.RecvBuffer = append(trans.RecvBuffer, &schedulepb.Message{ + Header: &schedulepb.Message_Header{ + Version: a.Version, + OwnerRevision: a.ownerInfo.Revision, + }, + From: "a", To: a.CaptureID, MsgType: schedulepb.MsgDispatchTableRequest, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: 1, + }, + }, + }, + }) + msgs, err := a.recvMsgs(ctx) + require.NoError(t, err) + require.EqualValues(t, []*schedulepb.Message{{ + Header: &schedulepb.Message_Header{ + Version: a.Version, + OwnerRevision: a.ownerInfo.Revision, + }, + From: "a", To: a.CaptureID, MsgType: schedulepb.MsgDispatchTableRequest, + DispatchTableRequest: &schedulepb.DispatchTableRequest{ + Request: &schedulepb.DispatchTableRequest_AddTable{ + AddTable: &schedulepb.AddTableRequest{ + TableID: 1, + Span: spanz.TableIDToComparableSpan(1), + }, + }, + }, + }}, msgs) +} + // MockTableExecutor is a mock implementation of TableExecutor. type MockTableExecutor struct { mock.Mock // it's preferred to use `pipeline.MockPipeline` here to make the test more vivid. - tables map[model.TableID]tablepb.TableState + tables *spanz.Map[tablepb.TableState] } +var _ internal.TableExecutor = (*MockTableExecutor)(nil) + // newMockTableExecutor creates a new mock table executor. func newMockTableExecutor() *MockTableExecutor { return &MockTableExecutor{ - tables: map[model.TableID]tablepb.TableState{}, + tables: spanz.NewMap[tablepb.TableState](), } } // AddTable adds a table to the executor. func (e *MockTableExecutor) AddTable( - ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool, + ctx context.Context, tableID tablepb.Span, startTs model.Ts, isPrepare bool, ) (bool, error) { log.Info("AddTable", - zap.Int64("tableID", tableID), + zap.Stringer("span", &tableID), zap.Any("startTs", startTs), zap.Bool("isPrepare", isPrepare)) - state, ok := e.tables[tableID] + state, ok := e.tables.Get(tableID) if ok { switch state { case tablepb.TableStatePreparing: return true, nil case tablepb.TableStatePrepared: if !isPrepare { - e.tables[tableID] = tablepb.TableStateReplicating + e.tables.ReplaceOrInsert(tableID, tablepb.TableStateReplicating) } return true, nil case tablepb.TableStateReplicating: return true, nil case tablepb.TableStateStopped: - delete(e.tables, tableID) + e.tables.Delete(tableID) } } args := e.Called(ctx, tableID, startTs, isPrepare) if args.Bool(0) { - e.tables[tableID] = tablepb.TableStatePreparing + e.tables.ReplaceOrInsert(tableID, tablepb.TableStatePreparing) } return args.Bool(0), args.Error(1) } // IsAddTableFinished determines if the table has been added. -func (e *MockTableExecutor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bool { - _, ok := e.tables[tableID] +func (e *MockTableExecutor) IsAddTableFinished(tableID tablepb.Span, isPrepare bool) bool { + _, ok := e.tables.Get(tableID) if !ok { log.Panic("table which was added is not found", - zap.Int64("tableID", tableID), + zap.Stringer("span", &tableID), zap.Bool("isPrepare", isPrepare)) } args := e.Called(tableID, isPrepare) if args.Bool(0) { - e.tables[tableID] = tablepb.TableStatePrepared + e.tables.ReplaceOrInsert(tableID, tablepb.TableStatePrepared) if !isPrepare { - e.tables[tableID] = tablepb.TableStateReplicating + e.tables.ReplaceOrInsert(tableID, tablepb.TableStateReplicating) } return true } - e.tables[tableID] = tablepb.TableStatePreparing + e.tables.ReplaceOrInsert(tableID, tablepb.TableStatePreparing) if !isPrepare { - e.tables[tableID] = tablepb.TableStatePrepared + e.tables.ReplaceOrInsert(tableID, tablepb.TableStatePrepared) } return false } // RemoveTable removes a table from the executor. -func (e *MockTableExecutor) RemoveTable(tableID model.TableID) bool { - state, ok := e.tables[tableID] +func (e *MockTableExecutor) RemoveTable(tableID tablepb.Span) bool { + state, ok := e.tables.Get(tableID) if !ok { - log.Warn("table to be remove is not found", zap.Int64("tableID", tableID)) + log.Warn("table to be remove is not found", zap.Stringer("span", &tableID)) return true } switch state { @@ -916,44 +1041,45 @@ func (e *MockTableExecutor) RemoveTable(tableID model.TableID) bool { default: } // the current `processor implementation, does not consider table's state - log.Info("RemoveTable", zap.Int64("tableID", tableID), zap.Any("state", state)) + log.Info("RemoveTable", zap.Stringer("span", &tableID), zap.Any("state", state)) args := e.Called(tableID) if args.Bool(0) { - e.tables[tableID] = tablepb.TableStateStopped + e.tables.ReplaceOrInsert(tableID, tablepb.TableStateStopped) } return args.Bool(0) } // IsRemoveTableFinished determines if the table has been removed. -func (e *MockTableExecutor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool) { - state, ok := e.tables[tableID] +func (e *MockTableExecutor) IsRemoveTableFinished(tableID tablepb.Span) (model.Ts, bool) { + state, ok := e.tables.Get(tableID) if !ok { // the real `table executor` processor, would panic in such case. log.Warn("table to be removed is not found", - zap.Int64("tableID", tableID)) + zap.Stringer("span", &tableID)) return 0, true } args := e.Called(tableID) if args.Bool(1) { log.Info("remove table finished, remove it from the executor", - zap.Int64("tableID", tableID), zap.Any("state", state)) - delete(e.tables, tableID) + zap.Stringer("span", &tableID), zap.Any("state", state)) + e.tables.Delete(tableID) } else { // revert the state back to old state, assume it's `replicating`, // but `preparing` / `prepared` can also be removed. - e.tables[tableID] = tablepb.TableStateReplicating + e.tables.ReplaceOrInsert(tableID, tablepb.TableStateReplicating) } return model.Ts(args.Int(0)), args.Bool(1) } // GetAllCurrentTables returns all tables that are currently being adding, running, or removing. -func (e *MockTableExecutor) GetAllCurrentTables() []model.TableID { - var result []model.TableID - for tableID := range e.tables { - result = append(result, tableID) - } +func (e *MockTableExecutor) GetAllCurrentTables() int { + var result int + e.tables.Ascend(func(span tablepb.Span, value tablepb.TableState) bool { + result++ + return true + }) return result } @@ -964,13 +1090,13 @@ func (e *MockTableExecutor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) } // GetTableStatus implements TableExecutor interface -func (e *MockTableExecutor) GetTableStatus(tableID model.TableID) tablepb.TableStatus { - state, ok := e.tables[tableID] +func (e *MockTableExecutor) GetTableStatus(span tablepb.Span) tablepb.TableStatus { + state, ok := e.tables.Get(span) if !ok { state = tablepb.TableStateAbsent } return tablepb.TableStatus{ - TableID: tableID, - State: state, + Span: span, + State: state, } } diff --git a/cdc/scheduler/internal/v3/agent/table.go b/cdc/scheduler/internal/v3/agent/table.go index a1d933ae174..a41b95fce02 100644 --- a/cdc/scheduler/internal/v3/agent/table.go +++ b/cdc/scheduler/internal/v3/agent/table.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/scheduler/internal" "github.com/pingcap/tiflow/cdc/scheduler/schedulepb" + "github.com/pingcap/tiflow/pkg/spanz" "go.uber.org/zap" ) @@ -29,7 +30,7 @@ import ( // also tracking its progress by utilize the `TableExecutor` type table struct { changefeedID model.ChangeFeedID - id model.TableID + span tablepb.Span state tablepb.TableState executor internal.TableExecutor @@ -38,11 +39,11 @@ type table struct { } func newTable( - changefeed model.ChangeFeedID, tableID model.TableID, executor internal.TableExecutor, + changefeed model.ChangeFeedID, span tablepb.Span, executor internal.TableExecutor, ) *table { return &table{ changefeedID: changefeed, - id: tableID, + span: span, state: tablepb.TableStateAbsent, // use `absent` as the default state. executor: executor, task: nil, @@ -53,14 +54,14 @@ func newTable( func (t *table) getAndUpdateTableState() (tablepb.TableState, bool) { oldState := t.state - meta := t.executor.GetTableStatus(t.id) + meta := t.executor.GetTableStatus(t.span) t.state = meta.State if oldState != t.state { log.Debug("schedulerv3: table state changed", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), + zap.Int64("tableID", t.span.TableID), zap.Stringer("oldState", oldState), zap.Stringer("state", t.state)) return t.state, true @@ -69,7 +70,7 @@ func (t *table) getAndUpdateTableState() (tablepb.TableState, bool) { } func (t *table) getTableStatus() tablepb.TableStatus { - return t.executor.GetTableStatus(t.id) + return t.executor.GetTableStatus(t.span) } func newAddTableResponseMessage(status tablepb.TableStatus) *schedulepb.Message { @@ -111,14 +112,14 @@ func (t *table) handleRemoveTableTask() *schedulepb.Message { log.Warn("schedulerv3: remove table, but table is absent", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id)) + zap.Int64("tableID", t.span.TableID)) t.task = nil return newRemoveTableResponseMessage(t.getTableStatus()) case tablepb.TableStateStopping, // stopping now is useless tablepb.TableStateStopped: // release table resource, and get the latest checkpoint // this will let the table become `absent` - checkpointTs, done := t.executor.IsRemoveTableFinished(t.id) + checkpointTs, done := t.executor.IsRemoveTableFinished(t.span) if !done { // actually, this should never be hit, since we know that table is stopped. status := t.getTableStatus() @@ -133,7 +134,7 @@ func (t *table) handleRemoveTableTask() *schedulepb.Message { case tablepb.TableStatePreparing, tablepb.TableStatePrepared, tablepb.TableStateReplicating: - done := t.executor.RemoveTable(t.task.TableID) + done := t.executor.RemoveTable(t.task.Span) if !done { status := t.getTableStatus() status.State = tablepb.TableStateStopping @@ -144,7 +145,7 @@ func (t *table) handleRemoveTableTask() *schedulepb.Message { log.Panic("schedulerv3: unknown table state", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), zap.Stringer("state", state)) + zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state)) } } return nil @@ -156,12 +157,12 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess for changed { switch state { case tablepb.TableStateAbsent: - done, err := t.executor.AddTable(ctx, t.task.TableID, t.task.StartTs, t.task.IsPrepare) + done, err := t.executor.AddTable(ctx, t.task.Span, t.task.StartTs, t.task.IsPrepare) if err != nil || !done { log.Warn("schedulerv3: agent add table failed", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), zap.Any("task", t.task), + zap.Int64("tableID", t.span.TableID), zap.Any("task", t.task), zap.Error(err)) status := t.getTableStatus() return newAddTableResponseMessage(status), errors.Trace(err) @@ -171,7 +172,7 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess log.Info("schedulerv3: table is replicating", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), zap.Stringer("state", state)) + zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state)) t.task = nil status := t.getTableStatus() return newAddTableResponseMessage(status), nil @@ -181,18 +182,18 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess log.Info("schedulerv3: table is prepared", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), zap.Stringer("state", state)) + zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state)) t.task = nil return newAddTableResponseMessage(t.getTableStatus()), nil } if t.task.status == dispatchTableTaskReceived { - done, err := t.executor.AddTable(ctx, t.task.TableID, t.task.StartTs, false) + done, err := t.executor.AddTable(ctx, t.task.Span, t.task.StartTs, false) if err != nil || !done { log.Warn("schedulerv3: agent add table failed", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), zap.Stringer("state", state), + zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state), zap.Error(err)) status := t.getTableStatus() return newAddTableResponseMessage(status), errors.Trace(err) @@ -200,7 +201,7 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess t.task.status = dispatchTableTaskProcessed } - done := t.executor.IsAddTableFinished(t.task.TableID, false) + done := t.executor.IsAddTableFinished(t.task.Span, false) if !done { return newAddTableResponseMessage(t.getTableStatus()), nil } @@ -208,7 +209,7 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess case tablepb.TableStatePreparing: // `preparing` is not stable state and would last a long time, // it's no need to return such a state, to make the coordinator become burdensome. - done := t.executor.IsAddTableFinished(t.task.TableID, t.task.IsPrepare) + done := t.executor.IsAddTableFinished(t.task.Span, t.task.IsPrepare) if !done { return nil, nil } @@ -216,20 +217,20 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess log.Info("schedulerv3: add table finished", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), zap.Stringer("state", state)) + zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state)) case tablepb.TableStateStopping, tablepb.TableStateStopped: log.Warn("schedulerv3: ignore add table", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id)) + zap.Int64("tableID", t.span.TableID)) t.task = nil return newAddTableResponseMessage(t.getTableStatus()), nil default: log.Panic("schedulerv3: unknown table state", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id)) + zap.Int64("tableID", t.span.TableID)) } } @@ -237,18 +238,18 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess } func (t *table) injectDispatchTableTask(task *dispatchTableTask) { - if t.id != task.TableID { + if !t.span.Eq(&task.Span) { log.Panic("schedulerv3: tableID not match", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), - zap.Int64("task.TableID", task.TableID)) + zap.Int64("tableID", t.span.TableID), + zap.Stringer("task.TableID", &task.Span)) } if t.task == nil { log.Info("schedulerv3: table found new task", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), + zap.Int64("tableID", t.span.TableID), zap.Any("task", task)) t.task = task return @@ -257,7 +258,7 @@ func (t *table) injectDispatchTableTask(task *dispatchTableTask) { "since there is one not finished yet", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), - zap.Int64("tableID", t.id), + zap.Int64("tableID", t.span.TableID), zap.Any("nowTask", t.task), zap.Any("ignoredTask", task)) } @@ -273,7 +274,7 @@ func (t *table) poll(ctx context.Context) (*schedulepb.Message, error) { } type tableManager struct { - tables map[model.TableID]*table + tables *spanz.Map[*table] executor internal.TableExecutor changefeedID model.ChangeFeedID @@ -283,7 +284,7 @@ func newTableManager( changefeed model.ChangeFeedID, executor internal.TableExecutor, ) *tableManager { return &tableManager{ - tables: make(map[model.TableID]*table), + tables: spanz.NewMap[*table](), executor: executor, changefeedID: changefeed, } @@ -291,54 +292,61 @@ func newTableManager( func (tm *tableManager) poll(ctx context.Context) ([]*schedulepb.Message, error) { result := make([]*schedulepb.Message, 0) - for tableID, table := range tm.tables { + var err error + toBeDropped := []tablepb.Span{} + tm.tables.Ascend(func(span tablepb.Span, table *table) bool { message, err := table.poll(ctx) if err != nil { - return result, errors.Trace(err) + err = errors.Trace(err) + return false } state, _ := table.getAndUpdateTableState() if state == tablepb.TableStateAbsent { - tm.dropTable(tableID) + toBeDropped = append(toBeDropped, span) } if message == nil { - continue + return true } result = append(result, message) + return true + }) + for _, span := range toBeDropped { + tm.dropTable(span) } - return result, nil + return result, err } -func (tm *tableManager) getAllTables() map[model.TableID]*table { +func (tm *tableManager) getAllTables() *spanz.Map[*table] { return tm.tables } // addTable add the target table, and return it. -func (tm *tableManager) addTable(tableID model.TableID) *table { - table, ok := tm.tables[tableID] +func (tm *tableManager) addTable(span tablepb.Span) *table { + table, ok := tm.tables.Get(span) if !ok { - table = newTable(tm.changefeedID, tableID, tm.executor) - tm.tables[tableID] = table + table = newTable(tm.changefeedID, span, tm.executor) + tm.tables.ReplaceOrInsert(span, table) } return table } -func (tm *tableManager) getTable(tableID model.TableID) (*table, bool) { - table, ok := tm.tables[tableID] +func (tm *tableManager) getTable(span tablepb.Span) (*table, bool) { + table, ok := tm.tables.Get(span) if ok { return table, true } return nil, false } -func (tm *tableManager) dropTable(tableID model.TableID) { - table, ok := tm.tables[tableID] +func (tm *tableManager) dropTable(span tablepb.Span) { + table, ok := tm.tables.Get(span) if !ok { log.Warn("schedulerv3: tableManager drop table not found", zap.String("namespace", tm.changefeedID.Namespace), zap.String("changefeed", tm.changefeedID.ID), - zap.Int64("tableID", tableID)) + zap.Stringer("span", &span)) return } state, _ := table.getAndUpdateTableState() @@ -346,25 +354,25 @@ func (tm *tableManager) dropTable(tableID model.TableID) { log.Panic("schedulerv3: tableManager drop table undesired", zap.String("namespace", tm.changefeedID.Namespace), zap.String("changefeed", tm.changefeedID.ID), - zap.Int64("tableID", tableID), + zap.Stringer("span", &span), zap.Stringer("state", table.state)) } log.Debug("schedulerv3: tableManager drop table", zap.String("namespace", tm.changefeedID.Namespace), zap.String("changefeed", tm.changefeedID.ID), - zap.Int64("tableID", tableID)) - delete(tm.tables, tableID) + zap.Stringer("span", &span)) + tm.tables.Delete(span) } -func (tm *tableManager) getTableStatus(tableID model.TableID) tablepb.TableStatus { - table, ok := tm.getTable(tableID) +func (tm *tableManager) getTableStatus(span tablepb.Span) tablepb.TableStatus { + table, ok := tm.getTable(span) if ok { return table.getTableStatus() } return tablepb.TableStatus{ - TableID: tableID, - State: tablepb.TableStateAbsent, + Span: span, + State: tablepb.TableStateAbsent, } } diff --git a/cdc/scheduler/internal/v3/agent/table_test.go b/cdc/scheduler/internal/v3/agent/table_test.go index 077ede1dcb5..98eb9ee62d7 100644 --- a/cdc/scheduler/internal/v3/agent/table_test.go +++ b/cdc/scheduler/internal/v3/agent/table_test.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/require" ) @@ -29,9 +30,10 @@ func TestTableManager(t *testing.T) { tableM := newTableManager(model.ChangeFeedID{}, mockTableExecutor) - tableM.addTable(model.TableID(1)) - require.Equal(t, tablepb.TableStateAbsent, tableM.tables[model.TableID(1)].state) + span1 := spanz.TableIDToComparableSpan(1) + tableM.addTable(span1) + require.Equal(t, tablepb.TableStateAbsent, tableM.tables.GetV(span1).state) - tableM.dropTable(model.TableID(1)) - require.NotContains(t, tableM.tables, model.TableID(1)) + tableM.dropTable(span1) + require.False(t, tableM.tables.Has(span1)) } diff --git a/cdc/scheduler/rexport.go b/cdc/scheduler/rexport.go index fe821e915f9..6a6f1b18f95 100644 --- a/cdc/scheduler/rexport.go +++ b/cdc/scheduler/rexport.go @@ -69,11 +69,11 @@ func NewAgent( etcdClient etcd.CDCEtcdClient, executor TableExecutor, changefeedID model.ChangeFeedID, + cfg *config.SchedulerConfig, ) (Agent, error) { return v3agent.NewAgent( ctx, captureID, liveness, changefeedID, - messageServer, messageRouter, etcdClient, executor, - ) + messageServer, messageRouter, etcdClient, executor, cfg) } // NewScheduler returns two-phase scheduler. From e3e968104fc1833360f8e2538c732d2049640dc1 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Wed, 7 Dec 2022 13:44:45 +0800 Subject: [PATCH 2/4] address comments and lints Signed-off-by: Neil Shen --- cdc/processor/manager.go | 2 +- cdc/processor/processor.go | 4 ++-- cdc/processor/processor_test.go | 2 +- cdc/scheduler/internal/table_executor.go | 6 +++--- cdc/scheduler/internal/v3/agent/agent_test.go | 4 ++-- cdc/scheduler/internal/v3/agent/table.go | 4 ++-- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index f6e62e1bc37..bc17e35e44e 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -243,7 +243,7 @@ func (m *managerImpl) handleCommand(ctx cdcContext.Context) error { case commandTpQueryTableCount: count := 0 for _, p := range m.processors { - count += p.GetAllCurrentTables() + count += p.GetTableCount() } select { case cmd.payload.(chan int) <- count: diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 47e503808fc..0643792ccd7 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -447,8 +447,8 @@ func (p *processor) IsRemoveTableFinished(span tablepb.Span) (model.Ts, bool) { return checkpointTs, true } -// GetAllCurrentTables implements TableExecutor interface. -func (p *processor) GetAllCurrentTables() int { +// GetTableCount implements TableExecutor interface. +func (p *processor) GetTableCount() int { if p.pullBasedSinking { return len(p.sinkManager.GetAllCurrentTableIDs()) } diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 1c2ab2862f3..d6d0deb6153 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -249,7 +249,7 @@ type mockAgent struct { } func (a *mockAgent) Tick(_ context.Context) error { - if a.executor.GetAllCurrentTables() == 0 { + if a.executor.GetTableCount() == 0 { return nil } a.lastCheckpointTs, _ = a.executor.GetCheckpoint() diff --git a/cdc/scheduler/internal/table_executor.go b/cdc/scheduler/internal/table_executor.go index 680c82eb424..2848e8cb67a 100644 --- a/cdc/scheduler/internal/table_executor.go +++ b/cdc/scheduler/internal/table_executor.go @@ -43,17 +43,17 @@ type TableExecutor interface { // return true and corresponding checkpoint otherwise. IsRemoveTableFinished(span tablepb.Span) (model.Ts, bool) - // GetAllCurrentTables should return all tables that are being run, + // GetTableCount should return the number of tables that are being run, // being added and being removed. // // NOTE: two subsequent calls to the method should return the same // result, unless there is a call to AddTable, RemoveTable, IsAddTableFinished // or IsRemoveTableFinished in between two calls to this method. - GetAllCurrentTables() int + GetTableCount() int // GetCheckpoint returns the local checkpoint-ts and resolved-ts of // the processor. Its calculation should take into consideration all - // tables that would have been returned if GetAllCurrentTables had been + // tables that would have been returned if GetTableCount had been // called immediately before. GetCheckpoint() (checkpointTs, resolvedTs model.Ts) diff --git a/cdc/scheduler/internal/v3/agent/agent_test.go b/cdc/scheduler/internal/v3/agent/agent_test.go index 15930f498fc..04a6ff10760 100644 --- a/cdc/scheduler/internal/v3/agent/agent_test.go +++ b/cdc/scheduler/internal/v3/agent/agent_test.go @@ -1073,8 +1073,8 @@ func (e *MockTableExecutor) IsRemoveTableFinished(tableID tablepb.Span) (model.T return model.Ts(args.Int(0)), args.Bool(1) } -// GetAllCurrentTables returns all tables that are currently being adding, running, or removing. -func (e *MockTableExecutor) GetAllCurrentTables() int { +// GetTableCount returns all tables that are currently being adding, running, or removing. +func (e *MockTableExecutor) GetTableCount() int { var result int e.tables.Ascend(func(span tablepb.Span, value tablepb.TableState) bool { result++ diff --git a/cdc/scheduler/internal/v3/agent/table.go b/cdc/scheduler/internal/v3/agent/table.go index a41b95fce02..75260586fe7 100644 --- a/cdc/scheduler/internal/v3/agent/table.go +++ b/cdc/scheduler/internal/v3/agent/table.go @@ -295,9 +295,9 @@ func (tm *tableManager) poll(ctx context.Context) ([]*schedulepb.Message, error) var err error toBeDropped := []tablepb.Span{} tm.tables.Ascend(func(span tablepb.Span, table *table) bool { - message, err := table.poll(ctx) + message, err1 := table.poll(ctx) if err != nil { - err = errors.Trace(err) + err = errors.Trace(err1) return false } From aee994a3af756a8c1eb92ae407a137b4ef25c831 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Wed, 7 Dec 2022 17:13:55 +0800 Subject: [PATCH 3/4] address comments Signed-off-by: Neil Shen --- Makefile | 1 + cdc/processor/manager.go | 2 +- cdc/processor/processor.go | 26 ++-- cdc/processor/processor_test.go | 63 +++++----- cdc/processor/tablepb/table.proto | 2 + cdc/scheduler/internal/table_executor.go | 28 ++--- cdc/scheduler/internal/v3/agent/agent.go | 18 +-- .../internal/v3/agent/agent_bench_test.go | 4 +- cdc/scheduler/internal/v3/agent/agent_test.go | 97 +++++++-------- cdc/scheduler/internal/v3/agent/table.go | 116 +++++++++--------- cdc/scheduler/internal/v3/agent/table_test.go | 6 +- cdc/scheduler/schedulepb/table_schedule.proto | 3 + 12 files changed, 187 insertions(+), 179 deletions(-) diff --git a/Makefile b/Makefile index 5d86c623d1b..d162a11784f 100644 --- a/Makefile +++ b/Makefile @@ -242,6 +242,7 @@ fmt: tools/bin/gofumports tools/bin/shfmt tools/bin/gci tools/bin/shfmt -d -w . @echo "check log style" scripts/check-log-style.sh + @make check-diff-line-width errdoc: tools/bin/errdoc-gen @echo "generator errors.toml" diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index bc17e35e44e..84b8ac42a06 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -243,7 +243,7 @@ func (m *managerImpl) handleCommand(ctx cdcContext.Context) error { case commandTpQueryTableCount: count := 0 for _, p := range m.processors { - count += p.GetTableCount() + count += p.GetTableSpanCount() } select { case cmd.payload.(chan int) <- count: diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 8d469eea071..89f821c693d 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -120,12 +120,12 @@ func (p *processor) checkReadyForMessages() bool { var _ scheduler.TableExecutor = (*processor)(nil) -// AddTable implements TableExecutor interface. -// AddTable may cause by the following scenario +// AddTableSpan implements TableExecutor interface. +// AddTableSpan may cause by the following scenario // 1. `Create Table`, a new table dispatched to the processor, `isPrepare` should be false // 2. Prepare phase for 2 phase scheduling, `isPrepare` should be true. // 3. Replicating phase for 2 phase scheduling, `isPrepare` should be false -func (p *processor) AddTable( +func (p *processor) AddTableSpan( ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool, ) (bool, error) { if !p.checkReadyForMessages() { @@ -249,8 +249,8 @@ func (p *processor) AddTable( return true, nil } -// RemoveTable implements TableExecutor interface. -func (p *processor) RemoveTable(span tablepb.Span) bool { +// RemoveTableSpan implements TableExecutor interface. +func (p *processor) RemoveTableSpan(span tablepb.Span) bool { if !p.checkReadyForMessages() { return false } @@ -291,8 +291,8 @@ func (p *processor) RemoveTable(span tablepb.Span) bool { return true } -// IsAddTableFinished implements TableExecutor interface. -func (p *processor) IsAddTableFinished(span tablepb.Span, isPrepare bool) bool { +// IsAddTableSpanFinished implements TableExecutor interface. +func (p *processor) IsAddTableSpanFinished(span tablepb.Span, isPrepare bool) bool { if !p.checkReadyForMessages() { return false } @@ -372,8 +372,8 @@ func (p *processor) IsAddTableFinished(span tablepb.Span, isPrepare bool) bool { return true } -// IsRemoveTableFinished implements TableExecutor interface. -func (p *processor) IsRemoveTableFinished(span tablepb.Span) (model.Ts, bool) { +// IsRemoveTableSpanFinished implements TableExecutor interface. +func (p *processor) IsRemoveTableSpanFinished(span tablepb.Span) (model.Ts, bool) { if !p.checkReadyForMessages() { return 0, false } @@ -448,8 +448,8 @@ func (p *processor) IsRemoveTableFinished(span tablepb.Span) (model.Ts, bool) { return checkpointTs, true } -// GetTableCount implements TableExecutor interface. -func (p *processor) GetTableCount() int { +// GetTableSpanCount implements TableExecutor interface. +func (p *processor) GetTableSpanCount() int { if p.pullBasedSinking { return len(p.sinkManager.GetAllCurrentTableIDs()) } @@ -461,8 +461,8 @@ func (p *processor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) { return p.checkpointTs, p.resolvedTs } -// GetTableStatus implements TableExecutor interface -func (p *processor) GetTableStatus(span tablepb.Span) tablepb.TableStatus { +// GetTableSpanStatus implements TableExecutor interface +func (p *processor) GetTableSpanStatus(span tablepb.Span) tablepb.TableStatus { if p.pullBasedSinking { state, exist := p.sinkManager.GetTableState(span.TableID) if !exist { diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index d6d0deb6153..f6bb6a89dad 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -39,7 +39,6 @@ import ( ) // processor needs to implement TableExecutor. -var _ scheduler.TableExecutor = (*processor)(nil) func newProcessor4Test( t *testing.T, @@ -249,7 +248,7 @@ type mockAgent struct { } func (a *mockAgent) Tick(_ context.Context) error { - if a.executor.GetTableCount() == 0 { + if a.executor.GetTableSpanCount() == 0 { return nil } a.lastCheckpointTs, _ = a.executor.GetCheckpoint() @@ -288,7 +287,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { tester.MustApplyPatches() // table-1: `preparing` -> `prepared` -> `replicating` - ok, err := p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 20, true) + ok, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 20, true) require.NoError(t, err) require.True(t, ok) @@ -302,7 +301,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { checkpointTs := p.agent.GetLastSentCheckpointTs() require.Equal(t, checkpointTs, model.Ts(0)) - done := p.IsAddTableFinished(spanz.TableIDToComparableSpan(1), true) + done := p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(1), true) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table1.State()) @@ -313,7 +312,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.Nil(t, err) tester.MustApplyPatches() - done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(1), true) + done = p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(1), true) require.True(t, done) require.Equal(t, tablepb.TableStatePrepared, table1.State()) @@ -321,12 +320,12 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { checkpointTs = p.agent.GetLastSentCheckpointTs() require.Equal(t, checkpointTs, model.Ts(20)) - ok, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 30, true) + ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, true) require.NoError(t, err) require.True(t, ok) require.Equal(t, model.Ts(0), table1.sinkStartTs) - ok, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 30, false) + ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, false) require.NoError(t, err) require.True(t, ok) require.Equal(t, model.Ts(30), table1.sinkStartTs) @@ -337,7 +336,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.Nil(t, err) tester.MustApplyPatches() - done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(1), false) + done = p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(1), false) require.True(t, done) require.Equal(t, tablepb.TableStateReplicating, table1.State()) @@ -371,33 +370,33 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { require.NoError(t, err) tester.MustApplyPatches() - ok, err := p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 20, false) + ok, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 20, false) require.NoError(t, err) require.True(t, ok) table1 := p.tables[1].(*mockTablePipeline) require.Equal(t, model.Ts(20), table1.sinkStartTs) require.Equal(t, tablepb.TableStatePreparing, table1.state) - meta := p.GetTableStatus(spanz.TableIDToComparableSpan(1)) + meta := p.GetTableSpanStatus(spanz.TableIDToComparableSpan(1)) require.Equal(t, model.TableID(1), meta.TableID) require.Equal(t, spanz.TableIDToComparableSpan(1), meta.Span) require.Equal(t, tablepb.TableStatePreparing, meta.State) - ok, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(2), 20, false) + ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), 20, false) require.NoError(t, err) require.True(t, ok) table2 := p.tables[2].(*mockTablePipeline) require.Equal(t, model.Ts(20), table2.sinkStartTs) require.Equal(t, tablepb.TableStatePreparing, table2.state) - ok, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(3), 20, false) + ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(3), 20, false) require.NoError(t, err) require.True(t, ok) table3 := p.tables[3].(*mockTablePipeline) require.Equal(t, model.Ts(20), table3.sinkStartTs) require.Equal(t, tablepb.TableStatePreparing, table3.state) - ok, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(4), 20, false) + ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(4), 20, false) require.NoError(t, err) require.True(t, ok) table4 := p.tables[4].(*mockTablePipeline) @@ -409,16 +408,16 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { checkpointTs := p.agent.GetLastSentCheckpointTs() require.Equal(t, checkpointTs, model.Ts(0)) - done := p.IsAddTableFinished(spanz.TableIDToComparableSpan(1), false) + done := p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(1), false) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table1.State()) - done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(2), false) + done = p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(2), false) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table2.State()) - done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(3), false) + done = p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(3), false) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table3.State()) - done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(4), false) + done = p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(4), false) require.False(t, done) require.Equal(t, tablepb.TableStatePreparing, table4.State()) require.Len(t, p.tables, 4) @@ -438,16 +437,16 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { table3.checkpointTs = 30 table4.checkpointTs = 30 - done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(1), false) + done = p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(1), false) require.True(t, done) require.Equal(t, tablepb.TableStateReplicating, table1.State()) - done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(2), false) + done = p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(2), false) require.True(t, done) require.Equal(t, tablepb.TableStateReplicating, table2.State()) - done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(3), false) + done = p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(3), false) require.True(t, done) require.Equal(t, tablepb.TableStateReplicating, table3.State()) - done = p.IsAddTableFinished(spanz.TableIDToComparableSpan(4), false) + done = p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(4), false) require.True(t, done) require.Equal(t, tablepb.TableStateReplicating, table4.State()) @@ -473,7 +472,7 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { require.NoError(t, err) tester.MustApplyPatches() - ok = p.RemoveTable(spanz.TableIDToComparableSpan(3)) + ok = p.RemoveTableSpan(spanz.TableIDToComparableSpan(3)) require.True(t, ok) err = p.Tick(ctx) @@ -485,7 +484,7 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { require.False(t, table3.canceled) require.Equal(t, model.Ts(60), table3.CheckpointTs()) - checkpointTs, done = p.IsRemoveTableFinished(spanz.TableIDToComparableSpan(3)) + checkpointTs, done = p.IsRemoveTableSpanFinished(spanz.TableIDToComparableSpan(3)) require.False(t, done) require.Equal(t, model.Ts(0), checkpointTs) @@ -508,10 +507,10 @@ func TestTableExecutorAddingTableDirectly(t *testing.T) { require.Len(t, p.tables, 4) require.False(t, table3.canceled) - checkpointTs, done = p.IsRemoveTableFinished(spanz.TableIDToComparableSpan(3)) + checkpointTs, done = p.IsRemoveTableSpanFinished(spanz.TableIDToComparableSpan(3)) require.True(t, done) require.Equal(t, model.Ts(65), checkpointTs) - meta = p.GetTableStatus(spanz.TableIDToComparableSpan(3)) + meta = p.GetTableSpanStatus(spanz.TableIDToComparableSpan(3)) require.Equal(t, model.TableID(3), meta.TableID) require.Equal(t, spanz.TableIDToComparableSpan(3), meta.Span) require.Equal(t, tablepb.TableStateAbsent, meta.State) @@ -605,10 +604,10 @@ func TestProcessorClose(t *testing.T) { tester.MustApplyPatches() // add tables - done, err := p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 20, false) + done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 20, false) require.Nil(t, err) require.True(t, done) - done, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(2), 30, false) + done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), 30, false) require.Nil(t, err) require.True(t, done) @@ -645,10 +644,10 @@ func TestProcessorClose(t *testing.T) { tester.MustApplyPatches() // add tables - done, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 20, false) + done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 20, false) require.Nil(t, err) require.True(t, done) - done, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(2), 30, false) + done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), 30, false) require.Nil(t, err) require.True(t, done) err = p.Tick(ctx) @@ -678,10 +677,10 @@ func TestPositionDeleted(t *testing.T) { p, tester := initProcessor4Test(ctx, t, &liveness) var err error // add table - done, err := p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 30, false) + done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, false) require.Nil(t, err) require.True(t, done) - done, err = p.AddTable(ctx, spanz.TableIDToComparableSpan(2), 40, false) + done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), 40, false) require.Nil(t, err) require.True(t, done) // init tick @@ -798,7 +797,7 @@ func TestUpdateBarrierTs(t *testing.T) { }) p.schemaStorage.(*mockSchemaStorage).resolvedTs = 10 - done, err := p.AddTable(ctx, spanz.TableIDToComparableSpan(1), 5, false) + done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 5, false) require.True(t, done) require.Nil(t, err) err = p.Tick(ctx) diff --git a/cdc/processor/tablepb/table.proto b/cdc/processor/tablepb/table.proto index 8b8f22eb64f..3c3f7c260c6 100644 --- a/cdc/processor/tablepb/table.proto +++ b/cdc/processor/tablepb/table.proto @@ -43,6 +43,7 @@ message Span { // ┌─────────┐ ┌──────────┐ ┌─────────────┐ // │ Stopped │ <─┤ Stopping │ <─┤ Replicating │ // └─────────┘ └──────────┘ └─────────────┘ +// TODO rename to TableSpanState. enum TableState { Unknown = 0 [(gogoproto.enumvalue_customname) = "TableStateUnknown"]; Absent = 1 [(gogoproto.enumvalue_customname) = "TableStateAbsent"]; @@ -71,6 +72,7 @@ message Stats { } // TableStatus is the running status of a table. +// TODO rename to TableStatus. message TableStatus { int64 table_id = 1 [ (gogoproto.casttype) = "github.com/pingcap/tiflow/cdc/model.TableID", diff --git a/cdc/scheduler/internal/table_executor.go b/cdc/scheduler/internal/table_executor.go index 2848e8cb67a..b09391e525c 100644 --- a/cdc/scheduler/internal/table_executor.go +++ b/cdc/scheduler/internal/table_executor.go @@ -26,37 +26,37 @@ import ( // to adapt the current Processor implementation to it. // TODO find a way to make the semantics easier to understand. type TableExecutor interface { - // AddTable add a new table with `startTs` + // AddTableSpan add a new table span with `startTs` // if `isPrepare` is true, the 1st phase of the 2 phase scheduling protocol. // if `isPrepare` is false, the 2nd phase. - AddTable( + AddTableSpan( ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool, ) (done bool, err error) - // IsAddTableFinished make sure the requested table is in the proper status - IsAddTableFinished(span tablepb.Span, isPrepare bool) (done bool) + // IsAddTableSpanFinished make sure the requested table span is in the proper status + IsAddTableSpanFinished(span tablepb.Span, isPrepare bool) (done bool) - // RemoveTable remove the table, return true if the table is already removed - RemoveTable(span tablepb.Span) (done bool) - // IsRemoveTableFinished convince the table is fully stopped. + // RemoveTableSpan remove the table, return true if the table is already removed + RemoveTableSpan(span tablepb.Span) (done bool) + // IsRemoveTableSpanFinished convince the table is fully stopped. // return false if table is not stopped // return true and corresponding checkpoint otherwise. - IsRemoveTableFinished(span tablepb.Span) (model.Ts, bool) + IsRemoveTableSpanFinished(span tablepb.Span) (model.Ts, bool) - // GetTableCount should return the number of tables that are being run, + // GetTableSpanCount should return the number of table spans that are being run, // being added and being removed. // // NOTE: two subsequent calls to the method should return the same // result, unless there is a call to AddTable, RemoveTable, IsAddTableFinished - // or IsRemoveTableFinished in between two calls to this method. - GetTableCount() int + // or IsRemoveTableSpanFinished in between two calls to this method. + GetTableSpanCount() int // GetCheckpoint returns the local checkpoint-ts and resolved-ts of // the processor. Its calculation should take into consideration all - // tables that would have been returned if GetTableCount had been + // tables that would have been returned if GetTableSpanCount had been // called immediately before. GetCheckpoint() (checkpointTs, resolvedTs model.Ts) - // GetTableStatus return the checkpoint and resolved ts for the given table - GetTableStatus(span tablepb.Span) tablepb.TableStatus + // GetTableSpanStatus return the checkpoint and resolved ts for the given table span. + GetTableSpanStatus(span tablepb.Span) tablepb.TableStatus } diff --git a/cdc/scheduler/internal/v3/agent/agent.go b/cdc/scheduler/internal/v3/agent/agent.go index 5e8be028ecd..018a0c2166c 100644 --- a/cdc/scheduler/internal/v3/agent/agent.go +++ b/cdc/scheduler/internal/v3/agent/agent.go @@ -42,7 +42,7 @@ type agent struct { trans transport.Transport compat *compat.Compat - tableM *tableManager + tableM *tableSpanManager ownerInfo ownerInfo @@ -92,7 +92,7 @@ func newAgent( ) (internal.Agent, error) { result := &agent{ agentInfo: newAgentInfo(changeFeedID, captureID), - tableM: newTableManager(changeFeedID, tableExecutor), + tableM: newTableSpanManager(changeFeedID, tableExecutor), liveness: liveness, compat: compat.New(cfg, map[model.CaptureID]*model.CaptureInfo{}), } @@ -259,10 +259,10 @@ func (a *agent) handleMessage(msg []*schedulepb.Message) []*schedulepb.Message { } func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) *schedulepb.Message { - allTables := a.tableM.getAllTables() + allTables := a.tableM.getAllTableSpans() result := make([]tablepb.TableStatus, 0, allTables.Len()) - allTables.Ascend(func(span tablepb.Span, table *table) bool { - status := table.getTableStatus() + allTables.Ascend(func(span tablepb.Span, table *tableSpan) bool { + status := table.getTableSpanStatus() if table.task != nil && table.task.IsRemove { status.State = tablepb.TableStateStopping } @@ -271,7 +271,7 @@ func (a *agent) handleMessageHeartbeat(request *schedulepb.Heartbeat) *schedulep }) for _, span := range request.GetSpans() { if _, ok := allTables.Get(span); !ok { - status := a.tableM.getTableStatus(span) + status := a.tableM.getTableSpanStatus(span) result = append(result, status) } } @@ -329,7 +329,7 @@ func (a *agent) handleMessageDispatchTableRequest( return } var ( - table *table + table *tableSpan task *dispatchTableTask ok bool ) @@ -346,10 +346,10 @@ func (a *agent) handleMessageDispatchTableRequest( Epoch: epoch, status: dispatchTableTaskReceived, } - table = a.tableM.addTable(span) + table = a.tableM.addTableSpan(span) case *schedulepb.DispatchTableRequest_RemoveTable: span := req.RemoveTable.GetSpan() - table, ok = a.tableM.getTable(span) + table, ok = a.tableM.getTableSpan(span) if !ok { log.Warn("schedulerv3: agent ignore remove table request, "+ "since the table not found", diff --git a/cdc/scheduler/internal/v3/agent/agent_bench_test.go b/cdc/scheduler/internal/v3/agent/agent_bench_test.go index d6020fdb551..613b6d1f7ab 100644 --- a/cdc/scheduler/internal/v3/agent/agent_bench_test.go +++ b/cdc/scheduler/internal/v3/agent/agent_bench_test.go @@ -28,13 +28,13 @@ func benchmarkHeartbeatResponse(b *testing.B, bench func(b *testing.B, a *agent) tableExec := newMockTableExecutor() liveness := model.LivenessCaptureAlive a := &agent{ - tableM: newTableManager(model.ChangeFeedID{}, tableExec), + tableM: newTableSpanManager(model.ChangeFeedID{}, tableExec), liveness: &liveness, } for j := 0; j < size; j++ { span := spanz.TableIDToComparableSpan(model.TableID(10000 + j)) - _ = a.tableM.addTable(span) + _ = a.tableM.addTableSpan(span) } b.ResetTimer() diff --git a/cdc/scheduler/internal/v3/agent/agent_test.go b/cdc/scheduler/internal/v3/agent/agent_test.go index 04a6ff10760..9d4c106795c 100644 --- a/cdc/scheduler/internal/v3/agent/agent_test.go +++ b/cdc/scheduler/internal/v3/agent/agent_test.go @@ -144,7 +144,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { a := newAgent4Test() mockTableExecutor := newMockTableExecutor() - a.tableM = newTableManager(model.ChangeFeedID{}, mockTableExecutor) + a.tableM = newTableSpanManager(model.ChangeFeedID{}, mockTableExecutor) removeTableRequest := &schedulepb.DispatchTableRequest{ Request: &schedulepb.DispatchTableRequest_RemoveTable{ @@ -174,7 +174,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { // addTableRequest should be not ignored even if it's stopping. a.handleLivenessUpdate(model.LivenessCaptureStopping) require.Equal(t, model.LivenessCaptureStopping, a.liveness.Load()) - mockTableExecutor.On("AddTable", mock.Anything, mock.Anything, + mockTableExecutor.On("AddTableSpan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(false, nil) a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch) responses, err = a.tableM.poll(ctx) @@ -192,16 +192,16 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { *a.liveness = model.LivenessCaptureAlive require.Equal(t, model.LivenessCaptureAlive, a.liveness.Load()) mockTableExecutor.ExpectedCalls = nil - mockTableExecutor.On("AddTable", mock.Anything, mock.Anything, + mockTableExecutor.On("AddTableSpan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(true, nil) - mockTableExecutor.On("IsAddTableFinished", mock.Anything, + mockTableExecutor.On("IsAddTableSpanFinished", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch) _, err = a.tableM.poll(ctx) require.NoError(t, err) mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1] - mockTableExecutor.On("IsAddTableFinished", mock.Anything, + mockTableExecutor.On("IsAddTableSpanFinished", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch) responses, err = a.tableM.poll(ctx) @@ -219,9 +219,10 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { addTableRequest.Request.(*schedulepb.DispatchTableRequest_AddTable). AddTable.IsSecondary = false - // only mock `IsAddTableFinished`, since `AddTable` by start a prepared table always success. + // only mock `IsAddTableSpanFinished`, since `AddTable` by start a prepared + // table span always success. mockTableExecutor.ExpectedCalls = nil - mockTableExecutor.On("IsAddTableFinished", mock.Anything, + mockTableExecutor.On("IsAddTableSpanFinished", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch) @@ -237,7 +238,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) mockTableExecutor.ExpectedCalls = nil - mockTableExecutor.On("IsAddTableFinished", mock.Anything, + mockTableExecutor.On("IsAddTableSpanFinished", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) a.handleMessageDispatchTableRequest(addTableRequest, processorEpoch) responses, err = a.tableM.poll(ctx) @@ -251,7 +252,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { require.Equal(t, tablepb.TableStateReplicating, addTableResponse.AddTable.Status.State) require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) - mockTableExecutor.On("RemoveTable", mock.Anything, mock.Anything). + mockTableExecutor.On("RemoveTableSpan", mock.Anything, mock.Anything). Return(false) // remove table in the replicating state failed, should still in replicating. a.handleMessageDispatchTableRequest(removeTableRequest, processorEpoch) @@ -266,9 +267,9 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { require.True(t, a.tableM.tables.Has(spanz.TableIDToComparableSpan(1))) mockTableExecutor.ExpectedCalls = nil - mockTableExecutor.On("RemoveTable", mock.Anything, mock.Anything). + mockTableExecutor.On("RemoveTableSpan", mock.Anything, mock.Anything). Return(true) - mockTableExecutor.On("IsRemoveTableFinished", mock.Anything, mock.Anything). + mockTableExecutor.On("IsRemoveTableSpanFinished", mock.Anything, mock.Anything). Return(3, false) // remove table in the replicating state failed, should still in replicating. a.handleMessageDispatchTableRequest(removeTableRequest, processorEpoch) @@ -282,7 +283,7 @@ func TestAgentHandleMessageDispatchTable(t *testing.T) { require.Equal(t, tablepb.TableStateStopping, removeTableResponse.RemoveTable.Status.State) mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1] - mockTableExecutor.On("IsRemoveTableFinished", mock.Anything, mock.Anything). + mockTableExecutor.On("IsRemoveTableSpanFinished", mock.Anything, mock.Anything). Return(3, true) // remove table in the replicating state success, should in stopped a.handleMessageDispatchTableRequest(removeTableRequest, processorEpoch) @@ -303,10 +304,10 @@ func TestAgentHandleMessageHeartbeat(t *testing.T) { a := newAgent4Test() mockTableExecutor := newMockTableExecutor() - a.tableM = newTableManager(model.ChangeFeedID{}, mockTableExecutor) + a.tableM = newTableSpanManager(model.ChangeFeedID{}, mockTableExecutor) for i := 0; i < 5; i++ { - a.tableM.addTable(spanz.TableIDToComparableSpan(int64(i))) + a.tableM.addTableSpan(spanz.TableIDToComparableSpan(int64(i))) } a.tableM.tables.GetV(spanz.TableIDToComparableSpan(0)).state = tablepb.TableStatePreparing @@ -393,7 +394,7 @@ func TestAgentPermuteMessages(t *testing.T) { a := newAgent4Test() mockTableExecutor := newMockTableExecutor() - a.tableM = newTableManager(model.ChangeFeedID{}, mockTableExecutor) + a.tableM = newTableSpanManager(model.ChangeFeedID{}, mockTableExecutor) trans := transport.NewMockTrans() a.trans = trans @@ -501,10 +502,10 @@ func TestAgentPermuteMessages(t *testing.T) { switch message.DispatchTableRequest.Request.(type) { case *schedulepb.DispatchTableRequest_AddTable: for _, ok := range []bool{false, true} { - mockTableExecutor.On("AddTable", mock.Anything, mock.Anything, + mockTableExecutor.On("AddTableSpan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ok, nil) for _, ok1 := range []bool{false, true} { - mockTableExecutor.On("IsAddTableFinished", mock.Anything, + mockTableExecutor.On("IsAddTableSpanFinished", mock.Anything, mock.Anything, mock.Anything).Return(ok1, nil) trans.RecvBuffer = append(trans.RecvBuffer, message) @@ -518,11 +519,11 @@ func TestAgentPermuteMessages(t *testing.T) { } case *schedulepb.DispatchTableRequest_RemoveTable: for _, ok := range []bool{false, true} { - mockTableExecutor.On("RemoveTable", mock.Anything, + mockTableExecutor.On("RemoveTableSpan", mock.Anything, mock.Anything).Return(ok) for _, ok1 := range []bool{false, true} { trans.RecvBuffer = append(trans.RecvBuffer, message) - mockTableExecutor.On("IsRemoveTableFinished", + mockTableExecutor.On("IsRemoveTableSpanFinished", mock.Anything, mock.Anything).Return(0, ok1) err := a.Tick(ctx) require.NoError(t, err) @@ -555,7 +556,7 @@ func TestAgentHandleMessage(t *testing.T) { t.Parallel() mockTableExecutor := newMockTableExecutor() - tableM := newTableManager(model.ChangeFeedID{}, mockTableExecutor) + tableM := newTableSpanManager(model.ChangeFeedID{}, mockTableExecutor) a := newAgent4Test() a.tableM = tableM @@ -649,7 +650,7 @@ func TestAgentTick(t *testing.T) { trans := transport.NewMockTrans() mockTableExecutor := newMockTableExecutor() a.trans = trans - a.tableM = newTableManager(model.ChangeFeedID{}, mockTableExecutor) + a.tableM = newTableSpanManager(model.ChangeFeedID{}, mockTableExecutor) heartbeat := &schedulepb.Message{ Header: &schedulepb.Message_Header{ @@ -716,9 +717,9 @@ func TestAgentTick(t *testing.T) { messages = append(messages, removeTableRequest) trans.RecvBuffer = append(trans.RecvBuffer, messages...) - mockTableExecutor.On("AddTable", mock.Anything, + mockTableExecutor.On("AddTableSpan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(true, nil) - mockTableExecutor.On("IsAddTableFinished", mock.Anything, + mockTableExecutor.On("IsAddTableSpanFinished", mock.Anything, mock.Anything, mock.Anything).Return(false, nil) require.NoError(t, a.Tick(ctx)) trans.SendBuffer = trans.SendBuffer[:0] @@ -726,7 +727,7 @@ func TestAgentTick(t *testing.T) { trans.RecvBuffer = append(trans.RecvBuffer, addTableRequest) mockTableExecutor.ExpectedCalls = mockTableExecutor.ExpectedCalls[:1] - mockTableExecutor.On("IsAddTableFinished", mock.Anything, + mockTableExecutor.On("IsAddTableSpanFinished", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) require.NoError(t, a.Tick(ctx)) responses := trans.SendBuffer[:len(trans.SendBuffer)] @@ -746,7 +747,7 @@ func TestAgentHandleLivenessUpdate(t *testing.T) { // Test liveness via heartbeat. mockTableExecutor := newMockTableExecutor() - tableM := newTableManager(model.ChangeFeedID{}, mockTableExecutor) + tableM := newTableSpanManager(model.ChangeFeedID{}, mockTableExecutor) a := newAgent4Test() a.tableM = tableM require.Equal(t, model.LivenessCaptureAlive, a.liveness.Load()) @@ -773,7 +774,7 @@ func TestAgentCommitAddTableDuringStopping(t *testing.T) { a := newAgent4Test() mockTableExecutor := newMockTableExecutor() - a.tableM = newTableManager(model.ChangeFeedID{}, mockTableExecutor) + a.tableM = newTableSpanManager(model.ChangeFeedID{}, mockTableExecutor) trans := transport.NewMockTrans() a.trans = trans @@ -799,20 +800,20 @@ func TestAgentCommitAddTableDuringStopping(t *testing.T) { // Prepare add table is still in-progress. mockTableExecutor. - On("AddTable", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("AddTableSpan", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(true, nil).Once() mockTableExecutor. - On("IsAddTableFinished", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("IsAddTableSpanFinished", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(false, nil).Once() err := a.Tick(context.Background()) require.Nil(t, err) require.Len(t, trans.SendBuffer, 0) mockTableExecutor. - On("AddTable", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("AddTableSpan", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(true, nil).Once() mockTableExecutor. - On("IsAddTableFinished", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("IsAddTableSpanFinished", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(true, nil).Once() err = a.Tick(context.Background()) require.Nil(t, err) @@ -841,10 +842,10 @@ func TestAgentCommitAddTableDuringStopping(t *testing.T) { trans.RecvBuffer = []*schedulepb.Message{commitTableMsg} trans.SendBuffer = []*schedulepb.Message{} mockTableExecutor. - On("AddTable", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("AddTableSpan", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(true, nil).Once() mockTableExecutor. - On("IsAddTableFinished", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("IsAddTableSpanFinished", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(false, nil).Once() // Set liveness to stopping. a.liveness.Store(model.LivenessCaptureStopping) @@ -855,7 +856,7 @@ func TestAgentCommitAddTableDuringStopping(t *testing.T) { trans.RecvBuffer = []*schedulepb.Message{} trans.SendBuffer = []*schedulepb.Message{} mockTableExecutor. - On("IsAddTableFinished", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("IsAddTableSpanFinished", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(true, nil).Once() err = a.Tick(context.Background()) require.Nil(t, err) @@ -870,7 +871,7 @@ func TestAgentTransportCompat(t *testing.T) { a := newAgent4Test() mockTableExecutor := newMockTableExecutor() - a.tableM = newTableManager(model.ChangeFeedID{}, mockTableExecutor) + a.tableM = newTableSpanManager(model.ChangeFeedID{}, mockTableExecutor) trans := transport.NewMockTrans() a.trans = trans a.compat = compat.New(&config.SchedulerConfig{ @@ -969,11 +970,11 @@ func newMockTableExecutor() *MockTableExecutor { } } -// AddTable adds a table to the executor. -func (e *MockTableExecutor) AddTable( +// AddTableSpan adds a table span to the executor. +func (e *MockTableExecutor) AddTableSpan( ctx context.Context, tableID tablepb.Span, startTs model.Ts, isPrepare bool, ) (bool, error) { - log.Info("AddTable", + log.Info("AddTableSpan", zap.Stringer("span", &tableID), zap.Any("startTs", startTs), zap.Bool("isPrepare", isPrepare)) @@ -1001,8 +1002,8 @@ func (e *MockTableExecutor) AddTable( return args.Bool(0), args.Error(1) } -// IsAddTableFinished determines if the table has been added. -func (e *MockTableExecutor) IsAddTableFinished(tableID tablepb.Span, isPrepare bool) bool { +// IsAddTableSpanFinished determines if the table span has been added. +func (e *MockTableExecutor) IsAddTableSpanFinished(tableID tablepb.Span, isPrepare bool) bool { _, ok := e.tables.Get(tableID) if !ok { log.Panic("table which was added is not found", @@ -1027,8 +1028,8 @@ func (e *MockTableExecutor) IsAddTableFinished(tableID tablepb.Span, isPrepare b return false } -// RemoveTable removes a table from the executor. -func (e *MockTableExecutor) RemoveTable(tableID tablepb.Span) bool { +// RemoveTableSpan removes a table span from the executor. +func (e *MockTableExecutor) RemoveTableSpan(tableID tablepb.Span) bool { state, ok := e.tables.Get(tableID) if !ok { log.Warn("table to be remove is not found", zap.Stringer("span", &tableID)) @@ -1041,7 +1042,7 @@ func (e *MockTableExecutor) RemoveTable(tableID tablepb.Span) bool { default: } // the current `processor implementation, does not consider table's state - log.Info("RemoveTable", zap.Stringer("span", &tableID), zap.Any("state", state)) + log.Info("RemoveTableSpan", zap.Stringer("span", &tableID), zap.Any("state", state)) args := e.Called(tableID) if args.Bool(0) { @@ -1050,8 +1051,8 @@ func (e *MockTableExecutor) RemoveTable(tableID tablepb.Span) bool { return args.Bool(0) } -// IsRemoveTableFinished determines if the table has been removed. -func (e *MockTableExecutor) IsRemoveTableFinished(tableID tablepb.Span) (model.Ts, bool) { +// IsRemoveTableSpanFinished determines if the table span has been removed. +func (e *MockTableExecutor) IsRemoveTableSpanFinished(tableID tablepb.Span) (model.Ts, bool) { state, ok := e.tables.Get(tableID) if !ok { // the real `table executor` processor, would panic in such case. @@ -1073,8 +1074,8 @@ func (e *MockTableExecutor) IsRemoveTableFinished(tableID tablepb.Span) (model.T return model.Ts(args.Int(0)), args.Bool(1) } -// GetTableCount returns all tables that are currently being adding, running, or removing. -func (e *MockTableExecutor) GetTableCount() int { +// GetTableSpanCount returns all tables that are currently being adding, running, or removing. +func (e *MockTableExecutor) GetTableSpanCount() int { var result int e.tables.Ascend(func(span tablepb.Span, value tablepb.TableState) bool { result++ @@ -1089,8 +1090,8 @@ func (e *MockTableExecutor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) return args.Get(0).(model.Ts), args.Get(1).(model.Ts) } -// GetTableStatus implements TableExecutor interface -func (e *MockTableExecutor) GetTableStatus(span tablepb.Span) tablepb.TableStatus { +// GetTableSpanStatus implements TableExecutor interface +func (e *MockTableExecutor) GetTableSpanStatus(span tablepb.Span) tablepb.TableStatus { state, ok := e.tables.Get(span) if !ok { state = tablepb.TableStateAbsent diff --git a/cdc/scheduler/internal/v3/agent/table.go b/cdc/scheduler/internal/v3/agent/table.go index 75260586fe7..3e4ca29ec1e 100644 --- a/cdc/scheduler/internal/v3/agent/table.go +++ b/cdc/scheduler/internal/v3/agent/table.go @@ -26,9 +26,9 @@ import ( "go.uber.org/zap" ) -// table is a state machine that manage the table's state, +// tableSpan is a state machine that manage the tableSpan's state, // also tracking its progress by utilize the `TableExecutor` -type table struct { +type tableSpan struct { changefeedID model.ChangeFeedID span tablepb.Span @@ -38,10 +38,10 @@ type table struct { task *dispatchTableTask } -func newTable( +func newTableSpan( changefeed model.ChangeFeedID, span tablepb.Span, executor internal.TableExecutor, -) *table { - return &table{ +) *tableSpan { + return &tableSpan{ changefeedID: changefeed, span: span, state: tablepb.TableStateAbsent, // use `absent` as the default state. @@ -50,11 +50,11 @@ func newTable( } } -// getAndUpdateTableState get the table' state, return true if the table state changed -func (t *table) getAndUpdateTableState() (tablepb.TableState, bool) { +// getAndUpdateTableSpanState get the table span' state, return true if the table state changed +func (t *tableSpan) getAndUpdateTableSpanState() (tablepb.TableState, bool) { oldState := t.state - meta := t.executor.GetTableStatus(t.span) + meta := t.executor.GetTableSpanStatus(t.span) t.state = meta.State if oldState != t.state { @@ -69,8 +69,8 @@ func (t *table) getAndUpdateTableState() (tablepb.TableState, bool) { return t.state, false } -func (t *table) getTableStatus() tablepb.TableStatus { - return t.executor.GetTableStatus(t.span) +func (t *tableSpan) getTableSpanStatus() tablepb.TableStatus { + return t.executor.GetTableSpanStatus(t.span) } func newAddTableResponseMessage(status tablepb.TableStatus) *schedulepb.Message { @@ -103,8 +103,8 @@ func newRemoveTableResponseMessage(status tablepb.TableStatus) *schedulepb.Messa return message } -func (t *table) handleRemoveTableTask() *schedulepb.Message { - state, _ := t.getAndUpdateTableState() +func (t *tableSpan) handleRemoveTableTask() *schedulepb.Message { + state, _ := t.getAndUpdateTableSpanState() changed := true for changed { switch state { @@ -114,33 +114,33 @@ func (t *table) handleRemoveTableTask() *schedulepb.Message { zap.String("changefeed", t.changefeedID.ID), zap.Int64("tableID", t.span.TableID)) t.task = nil - return newRemoveTableResponseMessage(t.getTableStatus()) + return newRemoveTableResponseMessage(t.getTableSpanStatus()) case tablepb.TableStateStopping, // stopping now is useless tablepb.TableStateStopped: // release table resource, and get the latest checkpoint - // this will let the table become `absent` - checkpointTs, done := t.executor.IsRemoveTableFinished(t.span) + // this will let the table span become `absent` + checkpointTs, done := t.executor.IsRemoveTableSpanFinished(t.span) if !done { // actually, this should never be hit, since we know that table is stopped. - status := t.getTableStatus() + status := t.getTableSpanStatus() status.State = tablepb.TableStateStopping return newRemoveTableResponseMessage(status) } t.task = nil - status := t.getTableStatus() + status := t.getTableSpanStatus() status.State = tablepb.TableStateStopped status.Checkpoint.CheckpointTs = checkpointTs return newRemoveTableResponseMessage(status) case tablepb.TableStatePreparing, tablepb.TableStatePrepared, tablepb.TableStateReplicating: - done := t.executor.RemoveTable(t.task.Span) + done := t.executor.RemoveTableSpan(t.task.Span) if !done { - status := t.getTableStatus() + status := t.getTableSpanStatus() status.State = tablepb.TableStateStopping return newRemoveTableResponseMessage(status) } - state, changed = t.getAndUpdateTableState() + state, changed = t.getAndUpdateTableSpanState() default: log.Panic("schedulerv3: unknown table state", zap.String("namespace", t.changefeedID.Namespace), @@ -151,30 +151,32 @@ func (t *table) handleRemoveTableTask() *schedulepb.Message { return nil } -func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Message, err error) { - state, _ := t.getAndUpdateTableState() +func (t *tableSpan) handleAddTableTask( + ctx context.Context, +) (result *schedulepb.Message, err error) { + state, _ := t.getAndUpdateTableSpanState() changed := true for changed { switch state { case tablepb.TableStateAbsent: - done, err := t.executor.AddTable(ctx, t.task.Span, t.task.StartTs, t.task.IsPrepare) + done, err := t.executor.AddTableSpan(ctx, t.task.Span, t.task.StartTs, t.task.IsPrepare) if err != nil || !done { log.Warn("schedulerv3: agent add table failed", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), zap.Int64("tableID", t.span.TableID), zap.Any("task", t.task), zap.Error(err)) - status := t.getTableStatus() + status := t.getTableSpanStatus() return newAddTableResponseMessage(status), errors.Trace(err) } - state, changed = t.getAndUpdateTableState() + state, changed = t.getAndUpdateTableSpanState() case tablepb.TableStateReplicating: log.Info("schedulerv3: table is replicating", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state)) t.task = nil - status := t.getTableStatus() + status := t.getTableSpanStatus() return newAddTableResponseMessage(status), nil case tablepb.TableStatePrepared: if t.task.IsPrepare { @@ -184,36 +186,36 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess zap.String("changefeed", t.changefeedID.ID), zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state)) t.task = nil - return newAddTableResponseMessage(t.getTableStatus()), nil + return newAddTableResponseMessage(t.getTableSpanStatus()), nil } if t.task.status == dispatchTableTaskReceived { - done, err := t.executor.AddTable(ctx, t.task.Span, t.task.StartTs, false) + done, err := t.executor.AddTableSpan(ctx, t.task.Span, t.task.StartTs, false) if err != nil || !done { log.Warn("schedulerv3: agent add table failed", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), zap.Int64("tableID", t.span.TableID), zap.Stringer("state", state), zap.Error(err)) - status := t.getTableStatus() + status := t.getTableSpanStatus() return newAddTableResponseMessage(status), errors.Trace(err) } t.task.status = dispatchTableTaskProcessed } - done := t.executor.IsAddTableFinished(t.task.Span, false) + done := t.executor.IsAddTableSpanFinished(t.task.Span, false) if !done { - return newAddTableResponseMessage(t.getTableStatus()), nil + return newAddTableResponseMessage(t.getTableSpanStatus()), nil } - state, changed = t.getAndUpdateTableState() + state, changed = t.getAndUpdateTableSpanState() case tablepb.TableStatePreparing: // `preparing` is not stable state and would last a long time, // it's no need to return such a state, to make the coordinator become burdensome. - done := t.executor.IsAddTableFinished(t.task.Span, t.task.IsPrepare) + done := t.executor.IsAddTableSpanFinished(t.task.Span, t.task.IsPrepare) if !done { return nil, nil } - state, changed = t.getAndUpdateTableState() + state, changed = t.getAndUpdateTableSpanState() log.Info("schedulerv3: add table finished", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), @@ -225,7 +227,7 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess zap.String("changefeed", t.changefeedID.ID), zap.Int64("tableID", t.span.TableID)) t.task = nil - return newAddTableResponseMessage(t.getTableStatus()), nil + return newAddTableResponseMessage(t.getTableSpanStatus()), nil default: log.Panic("schedulerv3: unknown table state", zap.String("namespace", t.changefeedID.Namespace), @@ -237,7 +239,7 @@ func (t *table) handleAddTableTask(ctx context.Context) (result *schedulepb.Mess return nil, nil } -func (t *table) injectDispatchTableTask(task *dispatchTableTask) { +func (t *tableSpan) injectDispatchTableTask(task *dispatchTableTask) { if !t.span.Eq(&task.Span) { log.Panic("schedulerv3: tableID not match", zap.String("namespace", t.changefeedID.Namespace), @@ -263,7 +265,7 @@ func (t *table) injectDispatchTableTask(task *dispatchTableTask) { zap.Any("ignoredTask", task)) } -func (t *table) poll(ctx context.Context) (*schedulepb.Message, error) { +func (t *tableSpan) poll(ctx context.Context) (*schedulepb.Message, error) { if t.task == nil { return nil, nil } @@ -273,35 +275,35 @@ func (t *table) poll(ctx context.Context) (*schedulepb.Message, error) { return t.handleAddTableTask(ctx) } -type tableManager struct { - tables *spanz.Map[*table] +type tableSpanManager struct { + tables *spanz.Map[*tableSpan] executor internal.TableExecutor changefeedID model.ChangeFeedID } -func newTableManager( +func newTableSpanManager( changefeed model.ChangeFeedID, executor internal.TableExecutor, -) *tableManager { - return &tableManager{ - tables: spanz.NewMap[*table](), +) *tableSpanManager { + return &tableSpanManager{ + tables: spanz.NewMap[*tableSpan](), executor: executor, changefeedID: changefeed, } } -func (tm *tableManager) poll(ctx context.Context) ([]*schedulepb.Message, error) { +func (tm *tableSpanManager) poll(ctx context.Context) ([]*schedulepb.Message, error) { result := make([]*schedulepb.Message, 0) var err error toBeDropped := []tablepb.Span{} - tm.tables.Ascend(func(span tablepb.Span, table *table) bool { + tm.tables.Ascend(func(span tablepb.Span, table *tableSpan) bool { message, err1 := table.poll(ctx) if err != nil { err = errors.Trace(err1) return false } - state, _ := table.getAndUpdateTableState() + state, _ := table.getAndUpdateTableSpanState() if state == tablepb.TableStateAbsent { toBeDropped = append(toBeDropped, span) } @@ -313,26 +315,26 @@ func (tm *tableManager) poll(ctx context.Context) ([]*schedulepb.Message, error) return true }) for _, span := range toBeDropped { - tm.dropTable(span) + tm.dropTableSpan(span) } return result, err } -func (tm *tableManager) getAllTables() *spanz.Map[*table] { +func (tm *tableSpanManager) getAllTableSpans() *spanz.Map[*tableSpan] { return tm.tables } -// addTable add the target table, and return it. -func (tm *tableManager) addTable(span tablepb.Span) *table { +// addTableSpan add the target table span, and return it. +func (tm *tableSpanManager) addTableSpan(span tablepb.Span) *tableSpan { table, ok := tm.tables.Get(span) if !ok { - table = newTable(tm.changefeedID, span, tm.executor) + table = newTableSpan(tm.changefeedID, span, tm.executor) tm.tables.ReplaceOrInsert(span, table) } return table } -func (tm *tableManager) getTable(span tablepb.Span) (*table, bool) { +func (tm *tableSpanManager) getTableSpan(span tablepb.Span) (*tableSpan, bool) { table, ok := tm.tables.Get(span) if ok { return table, true @@ -340,7 +342,7 @@ func (tm *tableManager) getTable(span tablepb.Span) (*table, bool) { return nil, false } -func (tm *tableManager) dropTable(span tablepb.Span) { +func (tm *tableSpanManager) dropTableSpan(span tablepb.Span) { table, ok := tm.tables.Get(span) if !ok { log.Warn("schedulerv3: tableManager drop table not found", @@ -349,7 +351,7 @@ func (tm *tableManager) dropTable(span tablepb.Span) { zap.Stringer("span", &span)) return } - state, _ := table.getAndUpdateTableState() + state, _ := table.getAndUpdateTableSpanState() if state != tablepb.TableStateAbsent { log.Panic("schedulerv3: tableManager drop table undesired", zap.String("namespace", tm.changefeedID.Namespace), @@ -365,10 +367,10 @@ func (tm *tableManager) dropTable(span tablepb.Span) { tm.tables.Delete(span) } -func (tm *tableManager) getTableStatus(span tablepb.Span) tablepb.TableStatus { - table, ok := tm.getTable(span) +func (tm *tableSpanManager) getTableSpanStatus(span tablepb.Span) tablepb.TableStatus { + table, ok := tm.getTableSpan(span) if ok { - return table.getTableStatus() + return table.getTableSpanStatus() } return tablepb.TableStatus{ diff --git a/cdc/scheduler/internal/v3/agent/table_test.go b/cdc/scheduler/internal/v3/agent/table_test.go index 98eb9ee62d7..7835322211e 100644 --- a/cdc/scheduler/internal/v3/agent/table_test.go +++ b/cdc/scheduler/internal/v3/agent/table_test.go @@ -28,12 +28,12 @@ func TestTableManager(t *testing.T) { // pretend there are 4 tables mockTableExecutor := newMockTableExecutor() - tableM := newTableManager(model.ChangeFeedID{}, mockTableExecutor) + tableM := newTableSpanManager(model.ChangeFeedID{}, mockTableExecutor) span1 := spanz.TableIDToComparableSpan(1) - tableM.addTable(span1) + tableM.addTableSpan(span1) require.Equal(t, tablepb.TableStateAbsent, tableM.tables.GetV(span1).state) - tableM.dropTable(span1) + tableM.dropTableSpan(span1) require.False(t, tableM.tables.Has(span1)) } diff --git a/cdc/scheduler/schedulepb/table_schedule.proto b/cdc/scheduler/schedulepb/table_schedule.proto index 9de4494560c..4db38e475ce 100644 --- a/cdc/scheduler/schedulepb/table_schedule.proto +++ b/cdc/scheduler/schedulepb/table_schedule.proto @@ -20,6 +20,9 @@ import "processor/tablepb/table.proto"; option(gogoproto.goproto_enum_prefix_all) = false; +// TODO rename XXTableRequest to XXTableSpanRequest respectively. +// TODO rename XXTableResponse to XXTableSpanResponse respectively. + message AddTableRequest { int64 table_id = 1 [ (gogoproto.casttype) = "github.com/pingcap/tiflow/cdc/model.TableID", From a130d2aac57f8cfa4c32386970731c16ac5c6985 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Wed, 7 Dec 2022 17:17:10 +0800 Subject: [PATCH 4/4] generate protobuf Signed-off-by: Neil Shen --- cdc/processor/tablepb/table.pb.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cdc/processor/tablepb/table.pb.go b/cdc/processor/tablepb/table.pb.go index 497adf18b40..ec90928f2e2 100644 --- a/cdc/processor/tablepb/table.pb.go +++ b/cdc/processor/tablepb/table.pb.go @@ -33,6 +33,8 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package // ┌─────────┐ ┌──────────┐ ┌─────────────┐ // │ Stopped │ <─┤ Stopping │ <─┤ Replicating │ // └─────────┘ └──────────┘ └─────────────┘ +// +// TODO rename to TableSpanState. type TableState int32 const ( @@ -239,6 +241,7 @@ func (m *Stats) GetBarrierTs() github_com_pingcap_tiflow_cdc_model.Ts { } // TableStatus is the running status of a table. +// TODO rename to TableStatus. type TableStatus struct { TableID github_com_pingcap_tiflow_cdc_model.TableID `protobuf:"varint,1,opt,name=table_id,json=tableId,proto3,casttype=github.com/pingcap/tiflow/cdc/model.TableID" json:"table_id,omitempty"` Span Span `protobuf:"bytes,5,opt,name=span,proto3" json:"span"`