Skip to content

Commit

Permalink
owner(cdc): fix two metrics problems (pingcap#4703) (pingcap#4729)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Apr 24, 2022
1 parent 383b7a0 commit 1a35190
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 35 deletions.
18 changes: 11 additions & 7 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/mvcc"
"go.uber.org/zap"
"golang.org/x/time/rate"

"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
Expand All @@ -39,12 +46,6 @@ import (
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/mvcc"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.
Expand Down Expand Up @@ -140,7 +141,10 @@ func (c *Capture) reset(ctx context.Context) error {
if c.TimeAcquirer != nil {
c.TimeAcquirer.Stop()
}
c.TimeAcquirer = pdtime.NewTimeAcquirer(c.pdClient)
c.TimeAcquirer, err = pdtime.NewTimeAcquirer(ctx, c.pdClient)
if err != nil {
return errors.Trace(err)
}

if c.tableActorSystem != nil {
err := c.tableActorSystem.Stop()
Expand Down
40 changes: 26 additions & 14 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/format"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
schedulerv2 "github.com/pingcap/tiflow/cdc/scheduler"
Expand All @@ -32,9 +36,6 @@ import (
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

type changefeed struct {
Expand Down Expand Up @@ -189,18 +190,25 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
if err != nil {
return errors.Trace(err)
}

pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached()
currentTs := oracle.GetPhysical(pdTime)

// CheckpointCannotProceed implies that not all tables are being replicated normally,
// so in that case there is no need to advance the global watermarks.
if newCheckpointTs != schedulerv2.CheckpointCannotProceed {
pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached()
currentTs := oracle.GetPhysical(pdTime)
if newResolvedTs > barrierTs {
newResolvedTs = barrierTs
}
if newCheckpointTs > barrierTs {
newCheckpointTs = barrierTs
}
c.updateStatus(currentTs, newCheckpointTs, newResolvedTs)
c.updateStatus(newCheckpointTs, newResolvedTs)
c.updateMetrics(currentTs, newCheckpointTs, newResolvedTs)
} else if c.state.Status != nil {
// We should keep the metrics updated even if the scheduler cannot
// advance the watermarks for now.
c.updateMetrics(currentTs, c.state.Status.CheckpointTs, c.state.Status.ResolvedTs)
}
return nil
}
Expand Down Expand Up @@ -522,7 +530,17 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
return done, nil
}

func (c *changefeed) updateStatus(currentTs int64, checkpointTs, resolvedTs model.Ts) {
func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs model.Ts) {
phyCkpTs := oracle.ExtractPhysical(checkpointTs)
c.metricsChangefeedCheckpointTsGauge.Set(float64(phyCkpTs))
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyCkpTs) / 1e3)

phyRTs := oracle.ExtractPhysical(resolvedTs)
c.metricsChangefeedResolvedTsGauge.Set(float64(phyRTs))
c.metricsChangefeedResolvedTsLagGauge.Set(float64(currentTs-phyRTs) / 1e3)
}

func (c *changefeed) updateStatus(checkpointTs, resolvedTs model.Ts) {
c.state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
changed := false
if status == nil {
Expand All @@ -538,19 +556,13 @@ func (c *changefeed) updateStatus(currentTs int64, checkpointTs, resolvedTs mode
}
return status, changed, nil
})
phyCkpTs := oracle.ExtractPhysical(checkpointTs)
c.metricsChangefeedCheckpointTsGauge.Set(float64(phyCkpTs))
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyCkpTs) / 1e3)

phyRTs := oracle.ExtractPhysical(resolvedTs)
c.metricsChangefeedResolvedTsGauge.Set(float64(phyRTs))
c.metricsChangefeedResolvedTsLagGauge.Set(float64(currentTs-phyRTs) / 1e3)
}

func (c *changefeed) Close(ctx cdcContext.Context) {
c.releaseResources(ctx)
}

// GetInfoProvider returns an InfoProvider if one is available.
func (c *changefeed) GetInfoProvider() schedulerv2.InfoProvider {
if provider, ok := c.scheduler.(schedulerv2.InfoProvider); ok {
return provider
Expand Down
33 changes: 33 additions & 0 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
Expand Down Expand Up @@ -306,6 +307,38 @@ func (o *Owner) updateMetrics(state *orchestrator.GlobalReactorState) {

ownerMaintainTableNumGauge.Reset()
changefeedStatusGauge.Reset()

conf := config.GetGlobalServerConfig()

// TODO refactor this piece of code when the new scheduler is stabilized,
// and the old scheduler is removed.
if conf.Debug != nil && conf.Debug.EnableNewScheduler {
for cfID, cf := range o.changefeeds {
if cf.state != nil && cf.state.Info != nil {
changefeedStatusGauge.WithLabelValues(cfID).Set(float64(cf.state.Info.State.ToInt()))
}

// The InfoProvider is a proxy object returning information
// from the scheduler.
infoProvider := cf.GetInfoProvider()
if infoProvider == nil {
// The scheduler has not been initialized yet.
continue
}

totalCounts := infoProvider.GetTotalTableCounts()
pendingCounts := infoProvider.GetPendingTableCounts()

for captureID, info := range o.captures {
ownerMaintainTableNumGauge.WithLabelValues(
cfID, info.AdvertiseAddr, maintainTableTypeTotal).Set(float64(totalCounts[captureID]))
ownerMaintainTableNumGauge.WithLabelValues(
cfID, info.AdvertiseAddr, maintainTableTypeWip).Set(float64(pendingCounts[captureID]))
}
}
return
}

for changefeedID, changefeedState := range state.Changefeeds {
for captureID, captureInfo := range state.Captures {
taskStatus, exist := changefeedState.TaskStatuses[captureID]
Expand Down
6 changes: 6 additions & 0 deletions cdc/owner/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@ func TestSchedulerNoPeer(t *testing.T) {
mockCluster.Close()
}

func TestInfoProvider(t *testing.T) {
sched := scheduler(new(schedulerV2))
_, ok := sched.(pscheduler.InfoProvider)
require.True(t, ok)
}

func receiveToChannels(
ctx context.Context,
t *testing.T,
Expand Down
40 changes: 40 additions & 0 deletions cdc/scheduler/info_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,20 @@ type InfoProvider interface {

// GetTaskPositions returns the task positions.
GetTaskPositions() (map[model.CaptureID]*model.TaskPosition, error)

// GetTotalTableCounts returns the number of tables associated
// with each capture.
GetTotalTableCounts() map[model.CaptureID]int

// GetPendingTableCounts returns the number of tables in a non-ready
// status (Adding & Removing) associated with each capture.
GetPendingTableCounts() map[model.CaptureID]int
}

// GetTaskStatuses implements InfoProvider for BaseScheduleDispatcher.
// Complexity Note: This function has O(#tables) cost. USE WITH CARE.
// Functions with cost O(#tables) are NOT recommended for regular metrics
// collection.
func (s *BaseScheduleDispatcher) GetTaskStatuses() (map[model.CaptureID]*model.TaskStatus, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -67,6 +78,7 @@ func (s *BaseScheduleDispatcher) GetTaskStatuses() (map[model.CaptureID]*model.T
}

// GetTaskPositions implements InfoProvider for BaseScheduleDispatcher.
// Complexity Note: This function has O(#captures) cost.
func (s *BaseScheduleDispatcher) GetTaskPositions() (map[model.CaptureID]*model.TaskPosition, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -81,3 +93,31 @@ func (s *BaseScheduleDispatcher) GetTaskPositions() (map[model.CaptureID]*model.

return ret, nil
}

// GetTotalTableCounts implements InfoProvider for BaseScheduleDispatcher.
// Complexity Note: This function has O(#captures) cost.
func (s *BaseScheduleDispatcher) GetTotalTableCounts() map[model.CaptureID]int {
s.mu.Lock()
defer s.mu.Unlock()

ret := make(map[model.CaptureID]int, len(s.captureStatus))
for captureID := range s.captureStatus {
ret[captureID] = s.tables.CountTableByCaptureID(captureID)
}
return ret
}

// GetPendingTableCounts implements InfoProvider for BaseScheduleDispatcher.
// Complexity Note: This function has O(#captures) cost.
func (s *BaseScheduleDispatcher) GetPendingTableCounts() map[model.CaptureID]int {
s.mu.Lock()
defer s.mu.Unlock()

ret := make(map[model.CaptureID]int, len(s.captureStatus))
for captureID := range s.captureStatus {
addCount := s.tables.CountTableByCaptureIDAndStatus(captureID, util.AddingTable)
removeCount := s.tables.CountTableByCaptureIDAndStatus(captureID, util.RemovingTable)
ret[captureID] = addCount + removeCount
}
return ret
}
15 changes: 15 additions & 0 deletions cdc/scheduler/info_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,18 @@ func TestInfoProviderTaskPosition(t *testing.T) {
},
}, taskPosition)
}

func TestInfoProviderTableCounts(t *testing.T) {
dispatcher := NewBaseScheduleDispatcher("cf-1", nil, 1300)
injectSchedulerStateForInfoProviderTest(dispatcher)

require.Equal(t, map[model.CaptureID]int{
"capture-1": 3,
"capture-2": 2,
}, dispatcher.GetTotalTableCounts())

require.Equal(t, map[model.CaptureID]int{
"capture-1": 1,
"capture-2": 1,
}, dispatcher.GetPendingTableCounts())
}
43 changes: 38 additions & 5 deletions cdc/scheduler/util/table_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ type TableSet struct {
// a non-unique index to facilitate looking up tables
// assigned to a given capture.
captureIndex map[model.CaptureID]map[model.TableID]*TableRecord

// caches the number of tables in each status associated with
// the given capture.
// This is used to accelerate scheduler decision and metrics
// collection when the number of tables is very high.
statusCounts map[model.CaptureID]map[TableStatus]int
}

// TableRecord is a record to be inserted into TableSet.
Expand Down Expand Up @@ -62,6 +68,7 @@ func NewTableSet() *TableSet {
return &TableSet{
tableIDMap: map[model.TableID]*TableRecord{},
captureIndex: map[model.CaptureID]map[model.TableID]*TableRecord{},
statusCounts: map[model.CaptureID]map[TableStatus]int{},
}
}

Expand All @@ -80,8 +87,15 @@ func (s *TableSet) AddTableRecord(record *TableRecord) (successful bool) {
captureIndexEntry = make(map[model.TableID]*TableRecord)
s.captureIndex[record.CaptureID] = captureIndexEntry
}

captureIndexEntry[record.TableID] = recordCloned

statusCountEntry := s.statusCounts[record.CaptureID]
if statusCountEntry == nil {
statusCountEntry = make(map[TableStatus]int)
s.statusCounts[record.CaptureID] = statusCountEntry
}
statusCountEntry[record.Status]++

return true
}

Expand All @@ -100,6 +114,9 @@ func (s *TableSet) UpdateTableRecord(record *TableRecord) (successful bool) {
recordCloned := record.Clone()
s.tableIDMap[record.TableID] = recordCloned
s.captureIndex[record.CaptureID][record.TableID] = recordCloned

s.statusCounts[record.CaptureID][oldRecord.Status]--
s.statusCounts[record.CaptureID][record.Status]++
return true
}

Expand Down Expand Up @@ -142,6 +159,13 @@ func (s *TableSet) RemoveTableRecord(tableID model.TableID) bool {
if len(captureIndexEntry) == 0 {
delete(s.captureIndex, record.CaptureID)
}

statusCountEntry, ok := s.statusCounts[record.CaptureID]
if !ok {
log.Panic("unreachable", zap.Int64("tableID", tableID))
}
statusCountEntry[record.Status]--

return true
}

Expand All @@ -161,6 +185,7 @@ func (s *TableSet) RemoveTableRecordByCaptureID(captureID model.CaptureID) []*Ta
ret = append(ret, record)
}
delete(s.captureIndex, captureID)
delete(s.statusCounts, captureID)
return ret
}

Expand All @@ -169,6 +194,16 @@ func (s *TableSet) CountTableByCaptureID(captureID model.CaptureID) int {
return len(s.captureIndex[captureID])
}

// CountTableByCaptureIDAndStatus counts the number of tables associated with the given captureID
// with the specified status.
func (s *TableSet) CountTableByCaptureIDAndStatus(captureID model.CaptureID, status TableStatus) int {
statusCountEntry, ok := s.statusCounts[captureID]
if !ok {
return 0
}
return statusCountEntry[status]
}

// GetDistinctCaptures counts distinct captures with tables.
func (s *TableSet) GetDistinctCaptures() []model.CaptureID {
var ret []model.CaptureID
Expand Down Expand Up @@ -202,10 +237,8 @@ func (s *TableSet) GetAllTablesGroupedByCaptures() map[model.CaptureID]map[model

// CountTableByStatus counts the number of tables with the given status.
func (s *TableSet) CountTableByStatus(status TableStatus) (count int) {
for _, record := range s.tableIDMap {
if record.Status == status {
count++
}
for _, statusEntryCount := range s.statusCounts {
count += statusEntryCount[status]
}
return
}
Loading

0 comments on commit 1a35190

Please sign in to comment.