Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tp: add metrics #5823

Merged
merged 4 commits into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tiflow/cdc/processor"
"github.com/pingcap/tiflow/cdc/puller"
redowriter "github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/cdc/scheduler"
sink "github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
"github.com/pingcap/tiflow/cdc/sorter"
Expand Down Expand Up @@ -54,14 +55,14 @@ func init() {
actor.InitMetrics(registry)
orchestrator.InitMetrics(registry)
p2p.InitMetrics(registry)
// Sorter metrics
sorter.InitMetrics(registry)
memory.InitMetrics(registry)
unified.InitMetrics(registry)
leveldb.InitMetrics(registry)
redowriter.InitMetrics(registry)
db.InitMetrics(registry)
kafka.InitMetrics(registry)
scheduler.InitMetrics(registry)
// TiKV client metrics, including metrics about resolved and region cache.
originalRegistry := prometheus.DefaultRegisterer
prometheus.DefaultRegisterer = registry
Expand Down
26 changes: 21 additions & 5 deletions cdc/scheduler/internal/tp/capture_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ type CaptureStatus struct {
Epoch schedulepb.ProcessorEpoch
State CaptureState
Tables []schedulepb.TableStatus
Addr string
}

func newCaptureStatus(rev schedulepb.OwnerRevision) *CaptureStatus {
return &CaptureStatus{OwnerRev: rev, State: CaptureStateUninitialized}
func newCaptureStatus(rev schedulepb.OwnerRevision, addr string) *CaptureStatus {
return &CaptureStatus{OwnerRev: rev, State: CaptureStateUninitialized, Addr: addr}
}

func (c *CaptureStatus) handleHeartbeatResponse(
Expand Down Expand Up @@ -100,13 +101,19 @@ type captureManager struct {
// A logical clock counter, for heartbeat.
tickCounter int
heartbeatTick int

changefeedID model.ChangeFeedID
}

func newCaptureManager(rev schedulepb.OwnerRevision, heartbeatTick int) *captureManager {
func newCaptureManager(
changefeedID model.ChangeFeedID, rev schedulepb.OwnerRevision, heartbeatTick int,
) *captureManager {
return &captureManager{
OwnerRev: rev,
Captures: make(map[model.CaptureID]*CaptureStatus),
heartbeatTick: heartbeatTick,

changefeedID: changefeedID,
}
}

Expand Down Expand Up @@ -178,10 +185,10 @@ func (c *captureManager) HandleAliveCaptureUpdate(
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
) []*schedulepb.Message {
msgs := make([]*schedulepb.Message, 0)
for id := range aliveCaptures {
for id, info := range aliveCaptures {
if _, ok := c.Captures[id]; !ok {
// A new capture.
c.Captures[id] = newCaptureStatus(c.OwnerRev)
c.Captures[id] = newCaptureStatus(c.OwnerRev, info.AdvertiseAddr)
log.Info("tpscheduler: find a new capture", zap.String("capture", id))
msgs = append(msgs, &schedulepb.Message{
To: id,
Expand Down Expand Up @@ -232,3 +239,12 @@ func (c *captureManager) TakeChanges() *captureChanges {
c.changes = nil
return changes
}

func (c *captureManager) CollectMetrics() {
cf := c.changefeedID
for _, capture := range c.Captures {
captureTableGauge.
WithLabelValues(cf.Namespace, cf.ID, capture.Addr).
Set(float64(len(capture.Tables)))
}
}
8 changes: 4 additions & 4 deletions cdc/scheduler/internal/tp/capture_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestCaptureStatusHandleHeartbeatResponse(t *testing.T) {

rev := schedulepb.OwnerRevision{Revision: 1}
epoch := schedulepb.ProcessorEpoch{Epoch: "test"}
c := newCaptureStatus(rev)
c := newCaptureStatus(rev, "")
require.Equal(t, CaptureStateUninitialized, c.State)

// Uninitialized -> Initialized
Expand All @@ -50,7 +50,7 @@ func TestCaptureManagerHandleAliveCaptureUpdate(t *testing.T) {
t.Parallel()

rev := schedulepb.OwnerRevision{}
cm := newCaptureManager(rev, 2)
cm := newCaptureManager(model.ChangeFeedID{}, rev, 2)
ms := map[model.CaptureID]*model.CaptureInfo{
"1": {}, "2": {}, "3": {},
}
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestCaptureManagerHandleMessages(t *testing.T) {
"1": {},
"2": {},
}
cm := newCaptureManager(rev, 2)
cm := newCaptureManager(model.ChangeFeedID{}, rev, 2)
require.False(t, cm.CheckAllCaptureInitialized())

// Initial handle alive captures.
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestCaptureManagerTick(t *testing.T) {
t.Parallel()

rev := schedulepb.OwnerRevision{}
cm := newCaptureManager(rev, 2)
cm := newCaptureManager(model.ChangeFeedID{}, rev, 2)

// No heartbeat if there is no capture.
msgs := cm.Tick(nil)
Expand Down
51 changes: 45 additions & 6 deletions cdc/scheduler/internal/tp/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tp
import (
"context"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -28,7 +29,10 @@ import (
"go.uber.org/zap"
)

const checkpointCannotProceed = internal.CheckpointCannotProceed
const (
checkpointCannotProceed = internal.CheckpointCannotProceed
metricsInterval = 10 * time.Second
)

var _ internal.Scheduler = (*coordinator)(nil)

Expand All @@ -40,30 +44,35 @@ type coordinator struct {
schedulers map[schedulerType]scheduler
replicationM *replicationManager
captureM *captureManager

lastCollectTime time.Time
changefeedID model.ChangeFeedID
tasksCounter map[struct{ scheduler, task string }]int
}

// NewCoordinator returns a two phase scheduler.
func NewCoordinator(
ctx context.Context,
captureID model.CaptureID,
changeFeedID model.ChangeFeedID,
changefeedID model.ChangeFeedID,
checkpointTs model.Ts,
messageServer *p2p.MessageServer,
messageRouter p2p.MessageRouter,
ownerRevision int64,
cfg *config.SchedulerConfig,
) (internal.Scheduler, error) {
trans, err := newTransport(ctx, changeFeedID, schedulerRole, messageServer, messageRouter)
trans, err := newTransport(ctx, changefeedID, schedulerRole, messageServer, messageRouter)
if err != nil {
return nil, errors.Trace(err)
}
coord := newCoordinator(captureID, ownerRevision, cfg)
coord := newCoordinator(captureID, changefeedID, ownerRevision, cfg)
coord.trans = trans
return coord, nil
}

func newCoordinator(
captureID model.CaptureID,
changefeedID model.ChangeFeedID,
ownerRevision int64,
cfg *config.SchedulerConfig,
) *coordinator {
Expand All @@ -78,8 +87,12 @@ func newCoordinator(
revision: revision,
captureID: captureID,
schedulers: schedulers,
replicationM: newReplicationManager(cfg.MaxTaskConcurrency),
captureM: newCaptureManager(revision, cfg.HeartbeatTick),
replicationM: newReplicationManager(cfg.MaxTaskConcurrency, changefeedID),
captureM: newCaptureManager(changefeedID, revision, cfg.HeartbeatTick),
tasksCounter: make(map[struct {
scheduler string
task string
}]int),
}
}

Expand Down Expand Up @@ -190,6 +203,12 @@ func (c *coordinator) poll(
zap.String("scheduler", scheduler.Name()))
}
allTasks = append(allTasks, tasks...)
for _, t := range tasks {
name := struct {
scheduler, task string
}{scheduler: scheduler.Name(), task: t.Name()}
c.tasksCounter[name]++
}
}

// Handle generated schedule tasks.
Expand All @@ -205,6 +224,8 @@ func (c *coordinator) poll(
return checkpointCannotProceed, checkpointCannotProceed, errors.Trace(err)
}

c.maybeCollectMetrics()

// Checkpoint calculation
newCheckpointTs, newResolvedTs = c.replicationM.AdvanceCheckpoint(currentTables)
return newCheckpointTs, newResolvedTs, nil
Expand Down Expand Up @@ -250,3 +271,21 @@ func (c *coordinator) sendMsgs(ctx context.Context, msgs []*schedulepb.Message)
}
return c.trans.Send(ctx, msgs)
}

func (c *coordinator) maybeCollectMetrics() {
now := time.Now()
if now.Sub(c.lastCollectTime) < metricsInterval {
return
}
c.lastCollectTime = now

cf := c.replicationM.changefeedID
for name, counter := range c.tasksCounter {
scheduleTaskCounter.
WithLabelValues(cf.Namespace, cf.ID, name.scheduler, name.task).
Add(float64(counter))
c.tasksCounter[name] = 0
}
c.replicationM.CollectMetrics()
c.captureM.CollectMetrics()
}
35 changes: 25 additions & 10 deletions cdc/scheduler/internal/tp/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestCoordinatorSendMsgs(t *testing.T) {
captureID: "0",
trans: trans,
}
coord.captureM = newCaptureManager(coord.revision, 0)
coord.captureM = newCaptureManager(model.ChangeFeedID{}, coord.revision, 0)
coord.sendMsgs(
ctx, []*schedulepb.Message{{To: "1", MsgType: schedulepb.MsgDispatchTableRequest}})

Expand Down Expand Up @@ -145,7 +145,7 @@ func TestCoordinatorRecvMsgs(t *testing.T) {
func TestCoordinatorHeartbeat(t *testing.T) {
t.Parallel()

coord := newCoordinator("a", 1, &config.SchedulerConfig{
coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
MaxTaskConcurrency: 1,
})
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestCoordinatorHeartbeat(t *testing.T) {

func TestCoordinatorAddCapture(t *testing.T) {
t.Parallel()
coord := newCoordinator("a", 1, &config.SchedulerConfig{
coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
MaxTaskConcurrency: 1,
})
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestCoordinatorAddCapture(t *testing.T) {
func TestCoordinatorRemoveCapture(t *testing.T) {
t.Parallel()

coord := newCoordinator("a", 1, &config.SchedulerConfig{
coord := newCoordinator("a", model.ChangeFeedID{}, 1, &config.SchedulerConfig{
HeartbeatTick: math.MaxInt,
MaxTaskConcurrency: 1,
})
Expand Down Expand Up @@ -341,9 +341,14 @@ func BenchmarkCoordinatorInit(b *testing.B) {
coord = &coordinator{
trans: &mockTrans{},
schedulers: schedulers,
replicationM: newReplicationManager(10),
replicationM: newReplicationManager(10, model.ChangeFeedID{}),
// Disable heartbeat.
captureM: newCaptureManager(schedulepb.OwnerRevision{}, math.MaxInt),
captureM: newCaptureManager(
model.ChangeFeedID{}, schedulepb.OwnerRevision{}, math.MaxInt),
tasksCounter: make(map[struct {
scheduler string
task string
}]int),
}
name = fmt.Sprintf("InitTable %d", total)
return name, coord, currentTables, captures
Expand All @@ -360,7 +365,8 @@ func BenchmarkCoordinatorHeartbeat(b *testing.B) {
const captureCount = 8
captures = map[model.CaptureID]*model.CaptureInfo{}
// Always heartbeat.
captureM := newCaptureManager(schedulepb.OwnerRevision{}, 0)
captureM := newCaptureManager(
model.ChangeFeedID{}, schedulepb.OwnerRevision{}, 0)
captureM.initialized = true
for i := 0; i < captureCount; i++ {
captures[fmt.Sprint(i)] = &model.CaptureInfo{}
Expand All @@ -375,8 +381,12 @@ func BenchmarkCoordinatorHeartbeat(b *testing.B) {
coord = &coordinator{
trans: &mockTrans{},
schedulers: schedulers,
replicationM: newReplicationManager(10),
replicationM: newReplicationManager(10, model.ChangeFeedID{}),
captureM: captureM,
tasksCounter: make(map[struct {
scheduler string
task string
}]int),
}
name = fmt.Sprintf("Heartbeat %d", total)
return name, coord, currentTables, captures
Expand All @@ -393,13 +403,14 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) {
const captureCount = 8
captures = map[model.CaptureID]*model.CaptureInfo{}
// Disable heartbeat.
captureM := newCaptureManager(schedulepb.OwnerRevision{}, math.MaxInt)
captureM := newCaptureManager(
model.ChangeFeedID{}, schedulepb.OwnerRevision{}, math.MaxInt)
captureM.initialized = true
for i := 0; i < captureCount; i++ {
captures[fmt.Sprint(i)] = &model.CaptureInfo{}
captureM.Captures[fmt.Sprint(i)] = &CaptureStatus{State: CaptureStateInitialized}
}
replicationM := newReplicationManager(10)
replicationM := newReplicationManager(10, model.ChangeFeedID{})
currentTables = make([]model.TableID, 0, total)
heartbeatResp := make(map[model.CaptureID]*schedulepb.Message)
for i := 0; i < total; i++ {
Expand Down Expand Up @@ -447,6 +458,10 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) {
schedulers: schedulers,
replicationM: replicationM,
captureM: captureM,
tasksCounter: make(map[struct {
scheduler string
task string
}]int),
}
name = fmt.Sprintf("HeartbeatResponse %d", total)
return name, coord, currentTables, captures
Expand Down
Loading