Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

processor/sourcemanager(ticdc): add sorter stats #7748

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (p *processor) GetTableStatus(tableID model.TableID) tablepb.TableStatus {
}
}

func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableID, sinkStats pipeline.Stats) tablepb.Stats {
func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableID, sinkStats sinkmanager.TableStats) tablepb.Stats {
pullerStats := p.sourceManager.GetTablePullerStats(tableID)
now, _ := p.upstream.PDClock.CurrentTime()

Expand All @@ -524,16 +524,15 @@ func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableI
},
}

// FIXME: add the stats of the sort engine.
//sortStats := p.sourceManager.GetTableSortStats(tableID)
//stats.StageCheckpoints["sorter-ingress"] = tablepb.Checkpoint{
// CheckpointTs: sortStats.CheckpointTsIngress,
// ResolvedTs: sortStats.ResolvedTsIngress,
//}
//stats.StageCheckpoints["sorter-egress"] = tablepb.Checkpoint{
// CheckpointTs: sortStats.CheckpointTsEgress,
// ResolvedTs: sortStats.ResolvedTsEgress,
//}
sortStats := p.sourceManager.GetTableSorterStats(tableID)
stats.StageCheckpoints["sorter-ingress"] = tablepb.Checkpoint{
CheckpointTs: sortStats.ReceivedMaxCommitTs,
ResolvedTs: sortStats.ReceivedMaxResolvedTs,
}
stats.StageCheckpoints["sorter-egress"] = tablepb.Checkpoint{
CheckpointTs: sinkStats.ReceivedMaxCommitTs,
ResolvedTs: sinkStats.ReceivedMaxCommitTs,
}

return stats
}
Expand Down Expand Up @@ -1252,10 +1251,15 @@ func (p *processor) doGCSchemaStorage() {
}

func (p *processor) refreshMetrics() {
var totalConsumed uint64
var totalEvents int64
if !p.pullBasedSinking {

if p.pullBasedSinking {
tables := p.sinkManager.GetAllCurrentTableIDs()
p.metricSyncTableNumGauge.Set(float64(len(tables)))
sortEngineReceivedEvents := p.sourceManager.ReceivedEvents()
tableSinksReceivedEvents := p.sinkManager.ReceivedEvents()
p.metricRemainKVEventGauge.Set(float64(sortEngineReceivedEvents - tableSinksReceivedEvents))
} else {
var totalConsumed uint64
var totalEvents int64
for _, table := range p.tables {
consumed := table.MemoryConsumption()
p.metricsTableMemoryHistogram.Observe(float64(consumed))
Expand Down
33 changes: 27 additions & 6 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo"
Expand All @@ -48,6 +47,16 @@ type gcEvent struct {
cleanPos engine.Position
}

// TableStats of a table sink.
type TableStats struct {
CheckpointTs model.Ts
ResolvedTs model.Ts
BarrierTs model.Ts
// From sorter.
ReceivedMaxCommitTs model.Ts
ReceivedMaxResolvedTs model.Ts
}

// SinkManager is the implementation of SinkManager.
type SinkManager struct {
changefeedID model.ChangeFeedID
Expand Down Expand Up @@ -626,7 +635,7 @@ func (m *SinkManager) GetTableState(tableID model.TableID) (tablepb.TableState,
}

// GetTableStats returns the state of the table.
func (m *SinkManager) GetTableStats(tableID model.TableID) pipeline.Stats {
func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats {
tableSink, ok := m.tableSinks.Load(tableID)
if !ok {
log.Panic("Table sink not found when getting table stats",
Expand Down Expand Up @@ -661,13 +670,25 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) pipeline.Stats {
} else {
resolvedTs = m.sortEngine.GetResolvedTs(tableID)
}
return pipeline.Stats{
CheckpointTs: resolvedMark,
ResolvedTs: resolvedTs,
BarrierTs: m.lastBarrierTs.Load(),
return TableStats{
CheckpointTs: resolvedMark,
ResolvedTs: resolvedTs,
BarrierTs: m.lastBarrierTs.Load(),
ReceivedMaxCommitTs: tableSink.(*tableSinkWrapper).getReceivedSorterCommitTs(),
ReceivedMaxResolvedTs: tableSink.(*tableSinkWrapper).getReceivedSorterResolvedTs(),
}
}

// ReceivedEvents returns the number of events received by all table sinks.
func (m *SinkManager) ReceivedEvents() int64 {
totalReceivedEvents := int64(0)
m.tableSinks.Range(func(_, value interface{}) bool {
totalReceivedEvents += value.(*tableSinkWrapper).getReceivedEventCount()
return true
})
return totalReceivedEvents
}

// Close closes all workers.
func (m *SinkManager) Close() error {
if m.cancel != nil {
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) error {
lastPos = pos
}

task.tableSink.updateReceivedSorterCommitTs(e.CRTs)
if e.Row == nil {
// NOTICE: This could happen when the event is filtered by the event filter.
continue
Expand Down
8 changes: 7 additions & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (err error)
)
break
}
// If redo log is enabled, we do not need to update this value.
// Because it already has been updated in the redo log worker.
if w.eventCache == nil {
task.tableSink.updateReceivedSorterCommitTs(e.CRTs)
}
task.tableSink.receivedEventCount.Add(1)
if e.Row == nil {
// NOTICE: This could happen when the event is filtered by the event filter.
// Maybe we just ignore the last event. So we need to record the last position.
Expand Down Expand Up @@ -428,7 +434,7 @@ func (w *sinkWorker) fetchFromCache(
// time is ok.
rawEventCount := 0
rows, size, pos := w.eventCache.pop(task.tableID, &rawEventCount, upperBound)
// TODO: record rawEventCount.
task.tableSink.receivedEventCount.Add(int64(rawEventCount))
if size > 0 {
w.metricRedoEventCacheHit.Add(float64(size))
}
Expand Down
19 changes: 19 additions & 0 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ type tableSinkWrapper struct {
// receivedSorterResolvedTs is the resolved ts received from the sorter.
// We use this to advance the redo log.
receivedSorterResolvedTs atomic.Uint64
// receivedSorterCommitTs is the commit ts received from the sorter.
// We use this to statistics the latency of the table sorter.
receivedSorterCommitTs atomic.Uint64
// receivedEventCount is the number of events received from the sorter.
receivedEventCount atomic.Int64
}

func newTableSinkWrapper(
Expand Down Expand Up @@ -89,6 +94,12 @@ func (t *tableSinkWrapper) updateReceivedSorterResolvedTs(ts model.Ts) {
t.receivedSorterResolvedTs.Store(ts)
}

func (t *tableSinkWrapper) updateReceivedSorterCommitTs(ts model.Ts) {
if ts > t.receivedSorterCommitTs.Load() {
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
t.receivedSorterCommitTs.Store(ts)
}
}

func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error {
if err := t.tableSink.UpdateResolvedTs(ts); err != nil {
return errors.Trace(err)
Expand All @@ -104,6 +115,14 @@ func (t *tableSinkWrapper) getReceivedSorterResolvedTs() model.Ts {
return t.receivedSorterResolvedTs.Load()
}

func (t *tableSinkWrapper) getReceivedSorterCommitTs() model.Ts {
return t.receivedSorterCommitTs.Load()
}

func (t *tableSinkWrapper) getReceivedEventCount() int64 {
return t.receivedEventCount.Load()
}

func (t *tableSinkWrapper) getState() tablepb.TableState {
return t.state.Load()
}
Expand Down
12 changes: 12 additions & 0 deletions cdc/processor/sourcemanager/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ type SortEngine interface {
// NOTE: It's only available if IsTableBased returns false.
CleanAllTables(upperBound Position) error

// GetStatsByTable gets the statistics of the given table.
GetStatsByTable(tableID model.TableID) TableStats

// ReceivedEvents returns the number of events received by the sort engine.
ReceivedEvents() int64

// Close closes the engine. All data written by this instance can be deleted.
//
// NOTE: it leads an undefined behavior to close an engine with active iterators.
Expand Down Expand Up @@ -129,3 +135,9 @@ func (p Position) Compare(q Position) int {
return 1
}
}

// TableStats of a sort engine.
type TableStats struct {
ReceivedMaxCommitTs model.Ts
ReceivedMaxResolvedTs model.Ts
}
34 changes: 23 additions & 11 deletions cdc/processor/sourcemanager/engine/memory/event_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,26 @@ func New(_ context.Context) *EventSorter {
return &EventSorter{}
}

// IsTableBased implements sorter.EventSortEngine.
// IsTableBased implements engine.SortEngine.
func (s *EventSorter) IsTableBased() bool {
return true
}

// AddTable implements sorter.EventSortEngine.
// AddTable implements engine.SortEngine.
func (s *EventSorter) AddTable(tableID model.TableID) {
if _, exists := s.tables.LoadOrStore(tableID, &tableSorter{}); exists {
log.Panic("add an exist table", zap.Int64("tableID", tableID))
}
}

// RemoveTable implements sorter.EventSortEngine.
// RemoveTable implements engine.SortEngine.
func (s *EventSorter) RemoveTable(tableID model.TableID) {
if _, exists := s.tables.LoadAndDelete(tableID); !exists {
log.Panic("remove an unexist table", zap.Int64("tableID", tableID))
}
}

// Add implements sorter.EventSortEngine.
// Add implements engine.SortEngine.
func (s *EventSorter) Add(tableID model.TableID, events ...*model.PolymorphicEvent) (err error) {
value, exists := s.tables.Load(tableID)
if !exists {
Expand All @@ -87,7 +87,7 @@ func (s *EventSorter) Add(tableID model.TableID, events ...*model.PolymorphicEve
return nil
}

// GetResolvedTs implements sorter.EventSortEngine.
// GetResolvedTs implements engine.SortEngine.
func (s *EventSorter) GetResolvedTs(tableID model.TableID) model.Ts {
value, exists := s.tables.Load(tableID)
if !exists {
Expand All @@ -97,14 +97,14 @@ func (s *EventSorter) GetResolvedTs(tableID model.TableID) model.Ts {
return value.(*tableSorter).getResolvedTs()
}

// OnResolve implements sorter.EventSortEngine.
// OnResolve implements engine.SortEngine.
func (s *EventSorter) OnResolve(action func(model.TableID, model.Ts)) {
s.mu.Lock()
defer s.mu.Unlock()
s.onResolves = append(s.onResolves, action)
}

// FetchByTable implements sorter.EventSortEngine.
// FetchByTable implements engine.SortEngine.
func (s *EventSorter) FetchByTable(tableID model.TableID, lowerBound, upperBound engine.Position) engine.EventIterator {
value, exists := s.tables.Load(tableID)
if !exists {
Expand All @@ -114,13 +114,13 @@ func (s *EventSorter) FetchByTable(tableID model.TableID, lowerBound, upperBound
return value.(*tableSorter).fetch(tableID, lowerBound, upperBound)
}

// FetchAllTables implements sorter.EventSortEngine.
// FetchAllTables implements engine.SortEngine.
func (s *EventSorter) FetchAllTables(lowerBound engine.Position) engine.EventIterator {
log.Panic("FetchAllTables should never be called")
return nil
}

// CleanByTable implements sorter.EventSortEngine.
// CleanByTable implements engine.SortEngine.
func (s *EventSorter) CleanByTable(tableID model.TableID, upperBound engine.Position) error {
value, exists := s.tables.Load(tableID)
if !exists {
Expand All @@ -131,13 +131,25 @@ func (s *EventSorter) CleanByTable(tableID model.TableID, upperBound engine.Posi
return nil
}

// CleanAllTables implements sorter.EventSortEngine.
// CleanAllTables implements engine.SortEngine.
func (s *EventSorter) CleanAllTables(upperBound engine.Position) error {
log.Panic("CleanAllTables should never be called")
return nil
}

// Close implements sorter.EventSortEngine.
// GetStatsByTable implements engine.SortEngine.
func (s *EventSorter) GetStatsByTable(tableID model.TableID) engine.TableStats {
log.Panic("GetStatsByTable should never be called")
return engine.TableStats{}
}

// ReceivedEvents implements engine.SortEngine.
func (s *EventSorter) ReceivedEvents() int64 {
log.Panic("ReceivedEvents should never be called")
return 0
}

// Close implements engine.SortEngine.
func (s *EventSorter) Close() error {
s.tables = sync.Map{}
return nil
Expand Down
28 changes: 28 additions & 0 deletions cdc/processor/sourcemanager/engine/mock/engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading