Skip to content

Commit

Permalink
processor/sourcemanager(ticdc): add sorter stats (#7748)
Browse files Browse the repository at this point in the history
ref #5928
  • Loading branch information
Rustin170506 authored Dec 1, 2022
1 parent 8579aaf commit 9c3eddb
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 74 deletions.
34 changes: 19 additions & 15 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (p *processor) GetTableStatus(tableID model.TableID) tablepb.TableStatus {
}
}

func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableID, sinkStats pipeline.Stats) tablepb.Stats {
func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableID, sinkStats sinkmanager.TableStats) tablepb.Stats {
pullerStats := p.sourceManager.GetTablePullerStats(tableID)
now, _ := p.upstream.PDClock.CurrentTime()

Expand All @@ -524,16 +524,15 @@ func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableI
},
}

// FIXME: add the stats of the sort engine.
//sortStats := p.sourceManager.GetTableSortStats(tableID)
//stats.StageCheckpoints["sorter-ingress"] = tablepb.Checkpoint{
// CheckpointTs: sortStats.CheckpointTsIngress,
// ResolvedTs: sortStats.ResolvedTsIngress,
//}
//stats.StageCheckpoints["sorter-egress"] = tablepb.Checkpoint{
// CheckpointTs: sortStats.CheckpointTsEgress,
// ResolvedTs: sortStats.ResolvedTsEgress,
//}
sortStats := p.sourceManager.GetTableSorterStats(tableID)
stats.StageCheckpoints["sorter-ingress"] = tablepb.Checkpoint{
CheckpointTs: sortStats.ReceivedMaxCommitTs,
ResolvedTs: sortStats.ReceivedMaxResolvedTs,
}
stats.StageCheckpoints["sorter-egress"] = tablepb.Checkpoint{
CheckpointTs: sinkStats.ReceivedMaxCommitTs,
ResolvedTs: sinkStats.ReceivedMaxCommitTs,
}

return stats
}
Expand Down Expand Up @@ -1252,10 +1251,15 @@ func (p *processor) doGCSchemaStorage() {
}

func (p *processor) refreshMetrics() {
var totalConsumed uint64
var totalEvents int64
if !p.pullBasedSinking {

if p.pullBasedSinking {
tables := p.sinkManager.GetAllCurrentTableIDs()
p.metricSyncTableNumGauge.Set(float64(len(tables)))
sortEngineReceivedEvents := p.sourceManager.ReceivedEvents()
tableSinksReceivedEvents := p.sinkManager.ReceivedEvents()
p.metricRemainKVEventGauge.Set(float64(sortEngineReceivedEvents - tableSinksReceivedEvents))
} else {
var totalConsumed uint64
var totalEvents int64
for _, table := range p.tables {
consumed := table.MemoryConsumption()
p.metricsTableMemoryHistogram.Observe(float64(consumed))
Expand Down
33 changes: 27 additions & 6 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo"
Expand All @@ -48,6 +47,16 @@ type gcEvent struct {
cleanPos engine.Position
}

// TableStats of a table sink.
type TableStats struct {
CheckpointTs model.Ts
ResolvedTs model.Ts
BarrierTs model.Ts
// From sorter.
ReceivedMaxCommitTs model.Ts
ReceivedMaxResolvedTs model.Ts
}

// SinkManager is the implementation of SinkManager.
type SinkManager struct {
changefeedID model.ChangeFeedID
Expand Down Expand Up @@ -626,7 +635,7 @@ func (m *SinkManager) GetTableState(tableID model.TableID) (tablepb.TableState,
}

// GetTableStats returns the state of the table.
func (m *SinkManager) GetTableStats(tableID model.TableID) pipeline.Stats {
func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats {
tableSink, ok := m.tableSinks.Load(tableID)
if !ok {
log.Panic("Table sink not found when getting table stats",
Expand Down Expand Up @@ -661,13 +670,25 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) pipeline.Stats {
} else {
resolvedTs = m.sortEngine.GetResolvedTs(tableID)
}
return pipeline.Stats{
CheckpointTs: resolvedMark,
ResolvedTs: resolvedTs,
BarrierTs: m.lastBarrierTs.Load(),
return TableStats{
CheckpointTs: resolvedMark,
ResolvedTs: resolvedTs,
BarrierTs: m.lastBarrierTs.Load(),
ReceivedMaxCommitTs: tableSink.(*tableSinkWrapper).getReceivedSorterCommitTs(),
ReceivedMaxResolvedTs: tableSink.(*tableSinkWrapper).getReceivedSorterResolvedTs(),
}
}

// ReceivedEvents returns the number of events received by all table sinks.
func (m *SinkManager) ReceivedEvents() int64 {
totalReceivedEvents := int64(0)
m.tableSinks.Range(func(_, value interface{}) bool {
totalReceivedEvents += value.(*tableSinkWrapper).getReceivedEventCount()
return true
})
return totalReceivedEvents
}

// Close closes all workers.
func (m *SinkManager) Close() error {
if m.cancel != nil {
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) error {
lastPos = pos
}

task.tableSink.updateReceivedSorterCommitTs(e.CRTs)
if e.Row == nil {
// NOTICE: This could happen when the event is filtered by the event filter.
continue
Expand Down
8 changes: 7 additions & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (err error)
)
break
}
// If redo log is enabled, we do not need to update this value.
// Because it already has been updated in the redo log worker.
if w.eventCache == nil {
task.tableSink.updateReceivedSorterCommitTs(e.CRTs)
}
task.tableSink.receivedEventCount.Add(1)
if e.Row == nil {
// NOTICE: This could happen when the event is filtered by the event filter.
// Maybe we just ignore the last event. So we need to record the last position.
Expand Down Expand Up @@ -428,7 +434,7 @@ func (w *sinkWorker) fetchFromCache(
// time is ok.
rawEventCount := 0
rows, size, pos := w.eventCache.pop(task.tableID, &rawEventCount, upperBound)
// TODO: record rawEventCount.
task.tableSink.receivedEventCount.Add(int64(rawEventCount))
if size > 0 {
w.metricRedoEventCacheHit.Add(float64(size))
}
Expand Down
19 changes: 19 additions & 0 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ type tableSinkWrapper struct {
// receivedSorterResolvedTs is the resolved ts received from the sorter.
// We use this to advance the redo log.
receivedSorterResolvedTs atomic.Uint64
// receivedSorterCommitTs is the commit ts received from the sorter.
// We use this to statistics the latency of the table sorter.
receivedSorterCommitTs atomic.Uint64
// receivedEventCount is the number of events received from the sorter.
receivedEventCount atomic.Int64
}

func newTableSinkWrapper(
Expand Down Expand Up @@ -89,6 +94,12 @@ func (t *tableSinkWrapper) updateReceivedSorterResolvedTs(ts model.Ts) {
t.receivedSorterResolvedTs.Store(ts)
}

func (t *tableSinkWrapper) updateReceivedSorterCommitTs(ts model.Ts) {
if ts > t.receivedSorterCommitTs.Load() {
t.receivedSorterCommitTs.Store(ts)
}
}

func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error {
if err := t.tableSink.UpdateResolvedTs(ts); err != nil {
return errors.Trace(err)
Expand All @@ -104,6 +115,14 @@ func (t *tableSinkWrapper) getReceivedSorterResolvedTs() model.Ts {
return t.receivedSorterResolvedTs.Load()
}

func (t *tableSinkWrapper) getReceivedSorterCommitTs() model.Ts {
return t.receivedSorterCommitTs.Load()
}

func (t *tableSinkWrapper) getReceivedEventCount() int64 {
return t.receivedEventCount.Load()
}

func (t *tableSinkWrapper) getState() tablepb.TableState {
return t.state.Load()
}
Expand Down
12 changes: 12 additions & 0 deletions cdc/processor/sourcemanager/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ type SortEngine interface {
// NOTE: It's only available if IsTableBased returns false.
CleanAllTables(upperBound Position) error

// GetStatsByTable gets the statistics of the given table.
GetStatsByTable(tableID model.TableID) TableStats

// ReceivedEvents returns the number of events received by the sort engine.
ReceivedEvents() int64

// Close closes the engine. All data written by this instance can be deleted.
//
// NOTE: it leads an undefined behavior to close an engine with active iterators.
Expand Down Expand Up @@ -129,3 +135,9 @@ func (p Position) Compare(q Position) int {
return 1
}
}

// TableStats of a sort engine.
type TableStats struct {
ReceivedMaxCommitTs model.Ts
ReceivedMaxResolvedTs model.Ts
}
34 changes: 23 additions & 11 deletions cdc/processor/sourcemanager/engine/memory/event_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,26 @@ func New(_ context.Context) *EventSorter {
return &EventSorter{}
}

// IsTableBased implements sorter.EventSortEngine.
// IsTableBased implements engine.SortEngine.
func (s *EventSorter) IsTableBased() bool {
return true
}

// AddTable implements sorter.EventSortEngine.
// AddTable implements engine.SortEngine.
func (s *EventSorter) AddTable(tableID model.TableID) {
if _, exists := s.tables.LoadOrStore(tableID, &tableSorter{}); exists {
log.Panic("add an exist table", zap.Int64("tableID", tableID))
}
}

// RemoveTable implements sorter.EventSortEngine.
// RemoveTable implements engine.SortEngine.
func (s *EventSorter) RemoveTable(tableID model.TableID) {
if _, exists := s.tables.LoadAndDelete(tableID); !exists {
log.Panic("remove an unexist table", zap.Int64("tableID", tableID))
}
}

// Add implements sorter.EventSortEngine.
// Add implements engine.SortEngine.
func (s *EventSorter) Add(tableID model.TableID, events ...*model.PolymorphicEvent) (err error) {
value, exists := s.tables.Load(tableID)
if !exists {
Expand All @@ -87,7 +87,7 @@ func (s *EventSorter) Add(tableID model.TableID, events ...*model.PolymorphicEve
return nil
}

// GetResolvedTs implements sorter.EventSortEngine.
// GetResolvedTs implements engine.SortEngine.
func (s *EventSorter) GetResolvedTs(tableID model.TableID) model.Ts {
value, exists := s.tables.Load(tableID)
if !exists {
Expand All @@ -97,14 +97,14 @@ func (s *EventSorter) GetResolvedTs(tableID model.TableID) model.Ts {
return value.(*tableSorter).getResolvedTs()
}

// OnResolve implements sorter.EventSortEngine.
// OnResolve implements engine.SortEngine.
func (s *EventSorter) OnResolve(action func(model.TableID, model.Ts)) {
s.mu.Lock()
defer s.mu.Unlock()
s.onResolves = append(s.onResolves, action)
}

// FetchByTable implements sorter.EventSortEngine.
// FetchByTable implements engine.SortEngine.
func (s *EventSorter) FetchByTable(tableID model.TableID, lowerBound, upperBound engine.Position) engine.EventIterator {
value, exists := s.tables.Load(tableID)
if !exists {
Expand All @@ -114,13 +114,13 @@ func (s *EventSorter) FetchByTable(tableID model.TableID, lowerBound, upperBound
return value.(*tableSorter).fetch(tableID, lowerBound, upperBound)
}

// FetchAllTables implements sorter.EventSortEngine.
// FetchAllTables implements engine.SortEngine.
func (s *EventSorter) FetchAllTables(lowerBound engine.Position) engine.EventIterator {
log.Panic("FetchAllTables should never be called")
return nil
}

// CleanByTable implements sorter.EventSortEngine.
// CleanByTable implements engine.SortEngine.
func (s *EventSorter) CleanByTable(tableID model.TableID, upperBound engine.Position) error {
value, exists := s.tables.Load(tableID)
if !exists {
Expand All @@ -131,13 +131,25 @@ func (s *EventSorter) CleanByTable(tableID model.TableID, upperBound engine.Posi
return nil
}

// CleanAllTables implements sorter.EventSortEngine.
// CleanAllTables implements engine.SortEngine.
func (s *EventSorter) CleanAllTables(upperBound engine.Position) error {
log.Panic("CleanAllTables should never be called")
return nil
}

// Close implements sorter.EventSortEngine.
// GetStatsByTable implements engine.SortEngine.
func (s *EventSorter) GetStatsByTable(tableID model.TableID) engine.TableStats {
log.Panic("GetStatsByTable should never be called")
return engine.TableStats{}
}

// ReceivedEvents implements engine.SortEngine.
func (s *EventSorter) ReceivedEvents() int64 {
log.Panic("ReceivedEvents should never be called")
return 0
}

// Close implements engine.SortEngine.
func (s *EventSorter) Close() error {
s.tables = sync.Map{}
return nil
Expand Down
28 changes: 28 additions & 0 deletions cdc/processor/sourcemanager/engine/mock/engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9c3eddb

Please sign in to comment.