From 9c3eddbc230683de20c529fe73044bddabeda812 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8C=E6=89=8B=E6=8E=89=E5=8C=85=E5=B7=A5=E7=A8=8B?= =?UTF-8?q?=E5=B8=88?= Date: Thu, 1 Dec 2022 18:30:02 +0800 Subject: [PATCH] processor/sourcemanager(ticdc): add sorter stats (#7748) ref pingcap/tiflow#5928 --- cdc/processor/processor.go | 34 +++--- cdc/processor/sinkmanager/manager.go | 33 +++++- cdc/processor/sinkmanager/redo_log_worker.go | 1 + .../sinkmanager/table_sink_worker.go | 8 +- .../sinkmanager/table_sink_wrapper.go | 19 +++ cdc/processor/sourcemanager/engine/engine.go | 12 ++ .../engine/memory/event_sorter.go | 34 ++++-- .../sourcemanager/engine/mock/engine_mock.go | 28 +++++ .../engine/pebble/event_sorter.go | 111 ++++++++++++++---- cdc/processor/sourcemanager/manager.go | 25 ++-- 10 files changed, 231 insertions(+), 74 deletions(-) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index a58c2763553..703ad817323 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -500,7 +500,7 @@ func (p *processor) GetTableStatus(tableID model.TableID) tablepb.TableStatus { } } -func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableID, sinkStats pipeline.Stats) tablepb.Stats { +func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableID, sinkStats sinkmanager.TableStats) tablepb.Stats { pullerStats := p.sourceManager.GetTablePullerStats(tableID) now, _ := p.upstream.PDClock.CurrentTime() @@ -524,16 +524,15 @@ func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableI }, } - // FIXME: add the stats of the sort engine. - //sortStats := p.sourceManager.GetTableSortStats(tableID) - //stats.StageCheckpoints["sorter-ingress"] = tablepb.Checkpoint{ - // CheckpointTs: sortStats.CheckpointTsIngress, - // ResolvedTs: sortStats.ResolvedTsIngress, - //} - //stats.StageCheckpoints["sorter-egress"] = tablepb.Checkpoint{ - // CheckpointTs: sortStats.CheckpointTsEgress, - // ResolvedTs: sortStats.ResolvedTsEgress, - //} + sortStats := p.sourceManager.GetTableSorterStats(tableID) + stats.StageCheckpoints["sorter-ingress"] = tablepb.Checkpoint{ + CheckpointTs: sortStats.ReceivedMaxCommitTs, + ResolvedTs: sortStats.ReceivedMaxResolvedTs, + } + stats.StageCheckpoints["sorter-egress"] = tablepb.Checkpoint{ + CheckpointTs: sinkStats.ReceivedMaxCommitTs, + ResolvedTs: sinkStats.ReceivedMaxCommitTs, + } return stats } @@ -1252,10 +1251,15 @@ func (p *processor) doGCSchemaStorage() { } func (p *processor) refreshMetrics() { - var totalConsumed uint64 - var totalEvents int64 - if !p.pullBasedSinking { - + if p.pullBasedSinking { + tables := p.sinkManager.GetAllCurrentTableIDs() + p.metricSyncTableNumGauge.Set(float64(len(tables))) + sortEngineReceivedEvents := p.sourceManager.ReceivedEvents() + tableSinksReceivedEvents := p.sinkManager.ReceivedEvents() + p.metricRemainKVEventGauge.Set(float64(sortEngineReceivedEvents - tableSinksReceivedEvents)) + } else { + var totalConsumed uint64 + var totalEvents int64 for _, table := range p.tables { consumed := table.MemoryConsumption() p.metricsTableMemoryHistogram.Observe(float64(consumed)) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 113c3308f48..cd68d06262c 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/processor/pipeline" "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/redo" @@ -48,6 +47,16 @@ type gcEvent struct { cleanPos engine.Position } +// TableStats of a table sink. +type TableStats struct { + CheckpointTs model.Ts + ResolvedTs model.Ts + BarrierTs model.Ts + // From sorter. + ReceivedMaxCommitTs model.Ts + ReceivedMaxResolvedTs model.Ts +} + // SinkManager is the implementation of SinkManager. type SinkManager struct { changefeedID model.ChangeFeedID @@ -626,7 +635,7 @@ func (m *SinkManager) GetTableState(tableID model.TableID) (tablepb.TableState, } // GetTableStats returns the state of the table. -func (m *SinkManager) GetTableStats(tableID model.TableID) pipeline.Stats { +func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats { tableSink, ok := m.tableSinks.Load(tableID) if !ok { log.Panic("Table sink not found when getting table stats", @@ -661,13 +670,25 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) pipeline.Stats { } else { resolvedTs = m.sortEngine.GetResolvedTs(tableID) } - return pipeline.Stats{ - CheckpointTs: resolvedMark, - ResolvedTs: resolvedTs, - BarrierTs: m.lastBarrierTs.Load(), + return TableStats{ + CheckpointTs: resolvedMark, + ResolvedTs: resolvedTs, + BarrierTs: m.lastBarrierTs.Load(), + ReceivedMaxCommitTs: tableSink.(*tableSinkWrapper).getReceivedSorterCommitTs(), + ReceivedMaxResolvedTs: tableSink.(*tableSinkWrapper).getReceivedSorterResolvedTs(), } } +// ReceivedEvents returns the number of events received by all table sinks. +func (m *SinkManager) ReceivedEvents() int64 { + totalReceivedEvents := int64(0) + m.tableSinks.Range(func(_, value interface{}) bool { + totalReceivedEvents += value.(*tableSinkWrapper).getReceivedEventCount() + return true + }) + return totalReceivedEvents +} + // Close closes all workers. func (m *SinkManager) Close() error { if m.cancel != nil { diff --git a/cdc/processor/sinkmanager/redo_log_worker.go b/cdc/processor/sinkmanager/redo_log_worker.go index 7775559cb37..32143121386 100644 --- a/cdc/processor/sinkmanager/redo_log_worker.go +++ b/cdc/processor/sinkmanager/redo_log_worker.go @@ -205,6 +205,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) error { lastPos = pos } + task.tableSink.updateReceivedSorterCommitTs(e.CRTs) if e.Row == nil { // NOTICE: This could happen when the event is filtered by the event filter. continue diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 89bd6527ded..aca4a76bd0b 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -187,6 +187,12 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (err error) ) break } + // If redo log is enabled, we do not need to update this value. + // Because it already has been updated in the redo log worker. + if w.eventCache == nil { + task.tableSink.updateReceivedSorterCommitTs(e.CRTs) + } + task.tableSink.receivedEventCount.Add(1) if e.Row == nil { // NOTICE: This could happen when the event is filtered by the event filter. // Maybe we just ignore the last event. So we need to record the last position. @@ -428,7 +434,7 @@ func (w *sinkWorker) fetchFromCache( // time is ok. rawEventCount := 0 rows, size, pos := w.eventCache.pop(task.tableID, &rawEventCount, upperBound) - // TODO: record rawEventCount. + task.tableSink.receivedEventCount.Add(int64(rawEventCount)) if size > 0 { w.metricRedoEventCacheHit.Add(float64(size)) } diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 1c9fb54ac04..bdeeb2e60fa 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -47,6 +47,11 @@ type tableSinkWrapper struct { // receivedSorterResolvedTs is the resolved ts received from the sorter. // We use this to advance the redo log. receivedSorterResolvedTs atomic.Uint64 + // receivedSorterCommitTs is the commit ts received from the sorter. + // We use this to statistics the latency of the table sorter. + receivedSorterCommitTs atomic.Uint64 + // receivedEventCount is the number of events received from the sorter. + receivedEventCount atomic.Int64 } func newTableSinkWrapper( @@ -89,6 +94,12 @@ func (t *tableSinkWrapper) updateReceivedSorterResolvedTs(ts model.Ts) { t.receivedSorterResolvedTs.Store(ts) } +func (t *tableSinkWrapper) updateReceivedSorterCommitTs(ts model.Ts) { + if ts > t.receivedSorterCommitTs.Load() { + t.receivedSorterCommitTs.Store(ts) + } +} + func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error { if err := t.tableSink.UpdateResolvedTs(ts); err != nil { return errors.Trace(err) @@ -104,6 +115,14 @@ func (t *tableSinkWrapper) getReceivedSorterResolvedTs() model.Ts { return t.receivedSorterResolvedTs.Load() } +func (t *tableSinkWrapper) getReceivedSorterCommitTs() model.Ts { + return t.receivedSorterCommitTs.Load() +} + +func (t *tableSinkWrapper) getReceivedEventCount() int64 { + return t.receivedEventCount.Load() +} + func (t *tableSinkWrapper) getState() tablepb.TableState { return t.state.Load() } diff --git a/cdc/processor/sourcemanager/engine/engine.go b/cdc/processor/sourcemanager/engine/engine.go index da39938f321..b101b6ee454 100644 --- a/cdc/processor/sourcemanager/engine/engine.go +++ b/cdc/processor/sourcemanager/engine/engine.go @@ -70,6 +70,12 @@ type SortEngine interface { // NOTE: It's only available if IsTableBased returns false. CleanAllTables(upperBound Position) error + // GetStatsByTable gets the statistics of the given table. + GetStatsByTable(tableID model.TableID) TableStats + + // ReceivedEvents returns the number of events received by the sort engine. + ReceivedEvents() int64 + // Close closes the engine. All data written by this instance can be deleted. // // NOTE: it leads an undefined behavior to close an engine with active iterators. @@ -129,3 +135,9 @@ func (p Position) Compare(q Position) int { return 1 } } + +// TableStats of a sort engine. +type TableStats struct { + ReceivedMaxCommitTs model.Ts + ReceivedMaxResolvedTs model.Ts +} diff --git a/cdc/processor/sourcemanager/engine/memory/event_sorter.go b/cdc/processor/sourcemanager/engine/memory/event_sorter.go index 5e05c94106c..e76af1729d3 100644 --- a/cdc/processor/sourcemanager/engine/memory/event_sorter.go +++ b/cdc/processor/sourcemanager/engine/memory/event_sorter.go @@ -50,26 +50,26 @@ func New(_ context.Context) *EventSorter { return &EventSorter{} } -// IsTableBased implements sorter.EventSortEngine. +// IsTableBased implements engine.SortEngine. func (s *EventSorter) IsTableBased() bool { return true } -// AddTable implements sorter.EventSortEngine. +// AddTable implements engine.SortEngine. func (s *EventSorter) AddTable(tableID model.TableID) { if _, exists := s.tables.LoadOrStore(tableID, &tableSorter{}); exists { log.Panic("add an exist table", zap.Int64("tableID", tableID)) } } -// RemoveTable implements sorter.EventSortEngine. +// RemoveTable implements engine.SortEngine. func (s *EventSorter) RemoveTable(tableID model.TableID) { if _, exists := s.tables.LoadAndDelete(tableID); !exists { log.Panic("remove an unexist table", zap.Int64("tableID", tableID)) } } -// Add implements sorter.EventSortEngine. +// Add implements engine.SortEngine. func (s *EventSorter) Add(tableID model.TableID, events ...*model.PolymorphicEvent) (err error) { value, exists := s.tables.Load(tableID) if !exists { @@ -87,7 +87,7 @@ func (s *EventSorter) Add(tableID model.TableID, events ...*model.PolymorphicEve return nil } -// GetResolvedTs implements sorter.EventSortEngine. +// GetResolvedTs implements engine.SortEngine. func (s *EventSorter) GetResolvedTs(tableID model.TableID) model.Ts { value, exists := s.tables.Load(tableID) if !exists { @@ -97,14 +97,14 @@ func (s *EventSorter) GetResolvedTs(tableID model.TableID) model.Ts { return value.(*tableSorter).getResolvedTs() } -// OnResolve implements sorter.EventSortEngine. +// OnResolve implements engine.SortEngine. func (s *EventSorter) OnResolve(action func(model.TableID, model.Ts)) { s.mu.Lock() defer s.mu.Unlock() s.onResolves = append(s.onResolves, action) } -// FetchByTable implements sorter.EventSortEngine. +// FetchByTable implements engine.SortEngine. func (s *EventSorter) FetchByTable(tableID model.TableID, lowerBound, upperBound engine.Position) engine.EventIterator { value, exists := s.tables.Load(tableID) if !exists { @@ -114,13 +114,13 @@ func (s *EventSorter) FetchByTable(tableID model.TableID, lowerBound, upperBound return value.(*tableSorter).fetch(tableID, lowerBound, upperBound) } -// FetchAllTables implements sorter.EventSortEngine. +// FetchAllTables implements engine.SortEngine. func (s *EventSorter) FetchAllTables(lowerBound engine.Position) engine.EventIterator { log.Panic("FetchAllTables should never be called") return nil } -// CleanByTable implements sorter.EventSortEngine. +// CleanByTable implements engine.SortEngine. func (s *EventSorter) CleanByTable(tableID model.TableID, upperBound engine.Position) error { value, exists := s.tables.Load(tableID) if !exists { @@ -131,13 +131,25 @@ func (s *EventSorter) CleanByTable(tableID model.TableID, upperBound engine.Posi return nil } -// CleanAllTables implements sorter.EventSortEngine. +// CleanAllTables implements engine.SortEngine. func (s *EventSorter) CleanAllTables(upperBound engine.Position) error { log.Panic("CleanAllTables should never be called") return nil } -// Close implements sorter.EventSortEngine. +// GetStatsByTable implements engine.SortEngine. +func (s *EventSorter) GetStatsByTable(tableID model.TableID) engine.TableStats { + log.Panic("GetStatsByTable should never be called") + return engine.TableStats{} +} + +// ReceivedEvents implements engine.SortEngine. +func (s *EventSorter) ReceivedEvents() int64 { + log.Panic("ReceivedEvents should never be called") + return 0 +} + +// Close implements engine.SortEngine. func (s *EventSorter) Close() error { s.tables = sync.Map{} return nil diff --git a/cdc/processor/sourcemanager/engine/mock/engine_mock.go b/cdc/processor/sourcemanager/engine/mock/engine_mock.go index 5995b1aabfd..d50e11ed4dd 100644 --- a/cdc/processor/sourcemanager/engine/mock/engine_mock.go +++ b/cdc/processor/sourcemanager/engine/mock/engine_mock.go @@ -150,6 +150,20 @@ func (mr *MockSortEngineMockRecorder) GetResolvedTs(tableID interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResolvedTs", reflect.TypeOf((*MockSortEngine)(nil).GetResolvedTs), tableID) } +// GetStatsByTable mocks base method. +func (m *MockSortEngine) GetStatsByTable(tableID model.TableID) engine.TableStats { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetStatsByTable", tableID) + ret0, _ := ret[0].(engine.TableStats) + return ret0 +} + +// GetStatsByTable indicates an expected call of GetStatsByTable. +func (mr *MockSortEngineMockRecorder) GetStatsByTable(tableID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStatsByTable", reflect.TypeOf((*MockSortEngine)(nil).GetStatsByTable), tableID) +} + // IsTableBased mocks base method. func (m *MockSortEngine) IsTableBased() bool { m.ctrl.T.Helper() @@ -176,6 +190,20 @@ func (mr *MockSortEngineMockRecorder) OnResolve(action interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnResolve", reflect.TypeOf((*MockSortEngine)(nil).OnResolve), action) } +// ReceivedEvents mocks base method. +func (m *MockSortEngine) ReceivedEvents() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReceivedEvents") + ret0, _ := ret[0].(int64) + return ret0 +} + +// ReceivedEvents indicates an expected call of ReceivedEvents. +func (mr *MockSortEngineMockRecorder) ReceivedEvents() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceivedEvents", reflect.TypeOf((*MockSortEngine)(nil).ReceivedEvents)) +} + // RemoveTable mocks base method. func (m *MockSortEngine) RemoveTable(tableID model.TableID) { m.ctrl.T.Helper() diff --git a/cdc/processor/sourcemanager/engine/pebble/event_sorter.go b/cdc/processor/sourcemanager/engine/pebble/event_sorter.go index 685ea74a484..7475c857c57 100644 --- a/cdc/processor/sourcemanager/engine/pebble/event_sorter.go +++ b/cdc/processor/sourcemanager/engine/pebble/event_sorter.go @@ -57,6 +57,7 @@ type EventSorter struct { // EventIter implements sorter.EventIterator. type EventIter struct { tableID model.TableID + state *tableState iter *pebble.Iterator headItem *model.PolymorphicEvent serde encoding.MsgPackGenSerde @@ -89,12 +90,12 @@ func New(ID model.ChangeFeedID, dbs []*pebble.DB) *EventSorter { return eventSorter } -// IsTableBased implements sorter.EventSortEngine. +// IsTableBased implements engine.SortEngine. func (s *EventSorter) IsTableBased() bool { return true } -// AddTable implements sorter.EventSortEngine. +// AddTable implements engine.SortEngine. func (s *EventSorter) AddTable(tableID model.TableID) { s.mu.Lock() if _, exists := s.tables[tableID]; exists { @@ -109,7 +110,7 @@ func (s *EventSorter) AddTable(tableID model.TableID) { s.mu.Unlock() } -// RemoveTable implements sorter.EventSortEngine. +// RemoveTable implements engine.SortEngine. func (s *EventSorter) RemoveTable(tableID model.TableID) { s.mu.Lock() if _, exists := s.tables[tableID]; !exists { @@ -124,52 +125,69 @@ func (s *EventSorter) RemoveTable(tableID model.TableID) { s.mu.Unlock() } -// Add implements sorter.EventSortEngine. -func (s *EventSorter) Add(tableID model.TableID, events ...*model.PolymorphicEvent) (err error) { +// Add implements engine.SortEngine. +func (s *EventSorter) Add(tableID model.TableID, events ...*model.PolymorphicEvent) error { s.mu.RLock() state, exists := s.tables[tableID] s.mu.RUnlock() if !exists { - log.Panic("add events into an unexist table", + log.Panic("add events into an non-existent table", zap.String("namespace", s.changefeedID.Namespace), zap.String("changefeed", s.changefeedID.ID), zap.Int64("tableID", tableID)) } + maxCommitTs := model.Ts(0) + maxResolvedTs := model.Ts(0) for _, event := range events { state.ch.In() <- eventWithTableID{tableID, event} if event.IsResolved() { - atomic.StoreUint64(&state.pendingResolved, event.CRTs) + if event.CRTs > maxResolvedTs { + maxResolvedTs = event.CRTs + } + } else { + state.receivedEvents.Add(1) + if event.CRTs > maxCommitTs { + maxCommitTs = event.CRTs + } } } - return + + if maxCommitTs > state.maxReceivedCommitTs.Load() { + state.maxReceivedCommitTs.Store(maxCommitTs) + } + if maxResolvedTs > state.maxReceivedResolvedTs.Load() { + state.maxReceivedResolvedTs.Store(maxResolvedTs) + } + + return nil } -// GetResolvedTs implements sorter.EventSortEngine. +// GetResolvedTs implements engine.SortEngine. func (s *EventSorter) GetResolvedTs(tableID model.TableID) model.Ts { s.mu.RLock() state, exists := s.tables[tableID] s.mu.RUnlock() if !exists { - log.Panic("get resolved ts from an unexist table", + log.Panic("get resolved ts from an non-existent table", zap.String("namespace", s.changefeedID.Namespace), zap.String("changefeed", s.changefeedID.ID), zap.Int64("tableID", tableID)) } - return atomic.LoadUint64(&state.pendingResolved) + return state.maxReceivedResolvedTs.Load() } -// OnResolve implements sorter.EventSortEngine. +// OnResolve implements engine.SortEngine. func (s *EventSorter) OnResolve(action func(model.TableID, model.Ts)) { s.mu.Lock() defer s.mu.Unlock() s.onResolves = append(s.onResolves, action) } -// FetchByTable implements sorter.EventSortEngine. +// FetchByTable implements engine.SortEngine. func (s *EventSorter) FetchByTable( tableID model.TableID, lowerBound, upperBound engine.Position, @@ -179,13 +197,13 @@ func (s *EventSorter) FetchByTable( s.mu.RUnlock() if !exists { - log.Panic("fetch events from an unexist table", + log.Panic("fetch events from an non-existent table", zap.String("namespace", s.changefeedID.Namespace), zap.String("changefeed", s.changefeedID.ID), zap.Int64("tableID", tableID)) } - sortedResolved := atomic.LoadUint64(&state.sortedResolved) + sortedResolved := state.sortedResolved.Load() if upperBound.CommitTs > sortedResolved { log.Panic("fetch unresolved events", zap.String("namespace", s.changefeedID.Namespace), @@ -197,10 +215,10 @@ func (s *EventSorter) FetchByTable( db := s.dbs[getDB(tableID, len(s.dbs))] iter := iterTable(db, s.uniqueID, tableID, lowerBound, upperBound) - return &EventIter{tableID: tableID, iter: iter, serde: s.serde} + return &EventIter{tableID: tableID, state: state, iter: iter, serde: s.serde} } -// FetchAllTables implements sorter.EventSortEngine. +// FetchAllTables implements engine.SortEngine. func (s *EventSorter) FetchAllTables(lowerBound engine.Position) engine.EventIterator { log.Panic("FetchAllTables should never be called", zap.String("namespace", s.changefeedID.Namespace), @@ -208,14 +226,14 @@ func (s *EventSorter) FetchAllTables(lowerBound engine.Position) engine.EventIte return nil } -// CleanByTable implements sorter.EventSortEngine. +// CleanByTable implements engine.SortEngine. func (s *EventSorter) CleanByTable(tableID model.TableID, upperBound engine.Position) error { s.mu.RLock() state, exists := s.tables[tableID] s.mu.RUnlock() if !exists { - log.Panic("clean an unexist table", + log.Panic("clean an non-existent table", zap.String("namespace", s.changefeedID.Namespace), zap.String("changefeed", s.changefeedID.ID), zap.Int64("tableID", tableID)) @@ -224,7 +242,7 @@ func (s *EventSorter) CleanByTable(tableID model.TableID, upperBound engine.Posi return s.cleanTable(state, tableID, upperBound) } -// CleanAllTables implements sorter.EventSortEngine. +// CleanAllTables implements engine.EventSortEngine. func (s *EventSorter) CleanAllTables(upperBound engine.Position) error { log.Panic("CleanAllTables should never be called", zap.String("namespace", s.changefeedID.Namespace), @@ -232,7 +250,44 @@ func (s *EventSorter) CleanAllTables(upperBound engine.Position) error { return nil } -// Close implements sorter.EventSortEngine. +// GetStatsByTable implements engine.SortEngine. +func (s *EventSorter) GetStatsByTable(tableID model.TableID) engine.TableStats { + s.mu.RLock() + state, exists := s.tables[tableID] + s.mu.RUnlock() + + if !exists { + log.Panic("Get stats from an non-existent table", + zap.String("namespace", s.changefeedID.Namespace), + zap.String("changefeed", s.changefeedID.ID), + zap.Int64("tableID", tableID)) + } + + maxCommitTs := state.maxReceivedCommitTs.Load() + maxResolvedTs := state.maxReceivedResolvedTs.Load() + if maxCommitTs < maxResolvedTs { + // In case, there is no write for the table, + // we use maxResolvedTs as maxCommitTs to make the stats meaningful. + maxCommitTs = maxResolvedTs + } + return engine.TableStats{ + ReceivedMaxCommitTs: maxCommitTs, + ReceivedMaxResolvedTs: maxResolvedTs, + } +} + +// ReceivedEvents implements engine.SortEngine. +func (s *EventSorter) ReceivedEvents() int64 { + s.mu.Lock() + defer s.mu.Unlock() + totalReceivedEvents := int64(0) + for _, state := range s.tables { + totalReceivedEvents += state.receivedEvents.Load() + } + return totalReceivedEvents +} + +// Close implements engine.SortEngine. func (s *EventSorter) Close() error { s.mu.Lock() if s.isClosed { @@ -299,9 +354,12 @@ type eventWithTableID struct { } type tableState struct { - ch *chann.Chann[eventWithTableID] - sortedResolved uint64 // indicates events are ready for fetching. - pendingResolved uint64 // events are resolved but not sorted. + ch *chann.Chann[eventWithTableID] + sortedResolved atomic.Uint64 // indicates events are ready for fetching. + // For statistics. + maxReceivedCommitTs atomic.Uint64 + maxReceivedResolvedTs atomic.Uint64 + receivedEvents atomic.Int64 // Following fields are protected by mu. mu sync.RWMutex @@ -374,7 +432,7 @@ func (s *EventSorter) handleEvents(db *pebble.DB, inputCh <-chan eventWithTableI s.mu.RUnlock() continue } - atomic.StoreUint64(&ts.sortedResolved, resolved) + ts.sortedResolved.Store(resolved) for _, onResolve := range s.onResolves { onResolve(table, resolved) } @@ -416,7 +474,7 @@ func (s *EventSorter) cleanTable(state *tableState, tableID model.TableID, upper return nil } -// / ----- Some internal variable and functions ----- /// +// ----- Some internal variable and functions ----- const batchCommitCount uint32 = 1024 var uniqueIDGen uint32 = 0 @@ -425,6 +483,7 @@ func genUniqueID() uint32 { return atomic.AddUint32(&uniqueIDGen, 1) } +// TODO: add test for this function. func getDB(tableID model.TableID, dbCount int) int { h := fnv.New64() b := [8]byte{} diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index fa43d337088..1bbcb1201a9 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -57,11 +57,6 @@ func New( } } -// IsTableBased just wrap the engine's IsTableBased method. -func (m *SourceManager) IsTableBased() bool { - return m.engine.IsTableBased() -} - // AddTable adds a table to the source manager. Start puller and register table to the engine. func (m *SourceManager) AddTable(ctx cdccontext.Context, tableID model.TableID, tableName string, startTs model.Ts) { p := pullerwrapper.NewPullerWrapper(m.changefeedID, tableID, tableName, startTs) @@ -89,21 +84,11 @@ func (m *SourceManager) FetchByTable(tableID model.TableID, lowerBound, upperBou return m.engine.FetchByTable(tableID, lowerBound, upperBound) } -// FetchAllTables just wrap the engine's FetchAllTables method. -func (m *SourceManager) FetchAllTables(lowerBound engine.Position) engine.EventIterator { - return m.engine.FetchAllTables(lowerBound) -} - // CleanByTable just wrap the engine's CleanByTable method. func (m *SourceManager) CleanByTable(tableID model.TableID, upperBound engine.Position) error { return m.engine.CleanByTable(tableID, upperBound) } -// CleanAllTables just wrap the engine's CleanAllTables method. -func (m *SourceManager) CleanAllTables(upperBound engine.Position) error { - return m.engine.CleanAllTables(upperBound) -} - // GetTablePullerStats returns the puller stats of the table. func (m *SourceManager) GetTablePullerStats(tableID model.TableID) puller.Stats { p, ok := m.pullers.Load(tableID) @@ -116,6 +101,16 @@ func (m *SourceManager) GetTablePullerStats(tableID model.TableID) puller.Stats return p.(*pullerwrapper.Wrapper).GetStats() } +// GetTableSorterStats returns the sorter stats of the table. +func (m *SourceManager) GetTableSorterStats(tableID model.TableID) engine.TableStats { + return m.engine.GetStatsByTable(tableID) +} + +// ReceivedEvents returns the number of events in the engine that have not been sent to the sink. +func (m *SourceManager) ReceivedEvents() int64 { + return m.engine.ReceivedEvents() +} + // Close closes the source manager. Stop all pullers and close the engine. func (m *SourceManager) Close() error { m.pullers.Range(func(key, value interface{}) bool {