Skip to content

Commit

Permalink
Merge pull request #6787 from planetscale/rn-vr-metrics
Browse files Browse the repository at this point in the history
vrepl metrics: add metrics around vstreamer and vreplication metrics
  • Loading branch information
deepthi authored Oct 6, 2020
2 parents a0a72cb + 5ad645c commit 7c0cf0f
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 26 deletions.
3 changes: 2 additions & 1 deletion go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type Stats struct {
QueryCount *stats.CountersWithSingleLabel
CopyRowCount *stats.Counter
CopyLoopCount *stats.Counter
ErrorCounts *stats.CountersWithMultiLabels
}

// SetLastPosition sets the last replication position.
Expand Down Expand Up @@ -129,7 +130,7 @@ func NewStats() *Stats {
bps.QueryCount = stats.NewCountersWithSingleLabel("", "", "Phase", "")
bps.CopyRowCount = stats.NewCounter("", "")
bps.CopyLoopCount = stats.NewCounter("", "")

bps.ErrorCounts = stats.NewCountersWithMultiLabels("", "", []string{"type"})
return bps
}

Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (ct *controller) run(ctx context.Context) {
default:
}
log.Errorf("stream %v: %v, retrying after %v", ct.id, err, *retryDelay)
ct.blpStats.ErrorCounts.Add([]string{"Stream Error"}, 1)
timer := time.NewTimer(*retryDelay)
select {
case <-ctx.Done():
Expand Down Expand Up @@ -192,6 +193,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
log.Infof("trying to find a tablet eligible for vreplication. stream id: %v", ct.id)
tablet, err = ct.tabletPicker.PickForStreaming(ctx)
if err != nil {
ct.blpStats.ErrorCounts.Add([]string{"No Source Tablet Found"}, 1)
return err
}
log.Infof("found a tablet eligible for vreplication. stream id: %v tablet: %s", ct.id, tablet.Alias.String())
Expand All @@ -203,6 +205,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
// Table names can have search patterns. Resolve them against the schema.
tables, err := mysqlctl.ResolveTables(ctx, ct.mysqld, dbClient.DBName(), ct.source.Tables)
if err != nil {
ct.blpStats.ErrorCounts.Add([]string{"Invalid Source"}, 1)
return vterrors.Wrap(err, "failed to resolve table names")
}

Expand Down Expand Up @@ -241,6 +244,7 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
vr := newVReplicator(ct.id, &ct.source, vsClient, ct.blpStats, dbClient, ct.mysqld, ct.vre)
return vr.Replicate(ctx)
}
ct.blpStats.ErrorCounts.Add([]string{"Invalid Source"}, 1)
return fmt.Errorf("missing source")
}

Expand Down
15 changes: 15 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,21 @@ func (st *vrStats) register() {
}
return result
})
stats.NewCountersFuncWithMultiLabels(
"VReplicationErrors",
"Errors during vreplication",
[]string{"workflow", "type"},
func() map[string]int64 {
st.mu.Lock()
defer st.mu.Unlock()
result := make(map[string]int64)
for _, ct := range st.controllers {
for key, val := range ct.blpStats.ErrorCounts.Counts() {
result[fmt.Sprintf("%d_%s", ct.id, key)] = val
}
}
return result
})
}

func (st *vrStats) numControllers() int64 {
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (vp *vplayer) play(ctx context.Context) error {

plan, err := buildReplicatorPlan(vp.vr.source.Filter, vp.vr.pkInfoMap, vp.copyState)
if err != nil {
vp.vr.stats.ErrorCounts.Add([]string{"Plan"}, 1)
return err
}
vp.replicatorPlan = plan
Expand Down Expand Up @@ -366,6 +367,8 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
}
}
if err := vp.applyEvent(ctx, event, mustSave); err != nil {
vp.vr.stats.ErrorCounts.Add([]string{"Apply"}, 1)
log.Errorf("Error applying event: %s", err.Error())
return err
}
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func newVReplicator(id uint32, source *binlogdatapb.BinlogSource, sourceVStreame
func (vr *vreplicator) Replicate(ctx context.Context) error {
err := vr.replicate(ctx)
if err != nil {
log.Errorf("Replicate error: %s", err.Error())
if err := vr.setMessage(err.Error()); err != nil {
log.Errorf("Failed to set error state: %v", err)
}
Expand Down Expand Up @@ -165,10 +166,12 @@ func (vr *vreplicator) replicate(ctx context.Context) error {
return err
}
if err := newVCopier(vr).copyNext(ctx, settings); err != nil {
vr.stats.ErrorCounts.Add([]string{"Copy"}, 1)
return err
}
case settings.StartPos.IsZero():
if err := newVCopier(vr).initTablesForCopy(ctx); err != nil {
vr.stats.ErrorCounts.Add([]string{"Copy"}, 1)
return err
}
default:
Expand All @@ -180,6 +183,7 @@ func (vr *vreplicator) replicate(ctx context.Context) error {
return vr.setState(binlogplayer.BlpStopped, "Stopped after copy.")
}
if err := vr.setState(binlogplayer.BlpRunning, ""); err != nil {
vr.stats.ErrorCounts.Add([]string{"Replicate"}, 1)
return err
}
return newVPlayer(vr, settings, nil, mysql.Position{}, "replicate").play(ctx)
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (uvs *uvstreamer) copy(ctx context.Context) error {
tableName := uvs.tablesToCopy[0]
log.V(2).Infof("Copystate not empty starting catchupAndCopy on table %s", tableName)
if err := uvs.catchupAndCopy(ctx, tableName); err != nil {
uvs.vse.errorCounts.Add("Copy", 1)
return err
}
}
Expand All @@ -50,6 +51,7 @@ func (uvs *uvstreamer) catchupAndCopy(ctx context.Context, tableName string) err
if !uvs.pos.IsZero() {
if err := uvs.catchup(ctx); err != nil {
log.Infof("catchupAndCopy: catchup returned %v", err)
uvs.vse.errorCounts.Add("Catchup", 1)
return err
}
}
Expand Down Expand Up @@ -265,6 +267,7 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
return nil
})
if err != nil {
uvs.vse.errorCounts.Add("StreamRows", 1)
return err
}

Expand Down
38 changes: 22 additions & 16 deletions go/vt/vttablet/tabletserver/vstreamer/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,17 @@ type Engine struct {
vschemaUpdates *stats.Counter

// vstreamer metrics
vstreamerPhaseTimings *servenv.TimingsWrapper
vstreamerEventsStreamed *stats.Counter
vstreamerPacketSize *stats.GaugeFunc
vstreamerNumPackets *stats.Counter
resultStreamerNumRows *stats.Counter
resultStreamerNumPackets *stats.Counter
rowStreamerNumRows *stats.Counter
rowStreamerNumPackets *stats.Counter
vstreamerPhaseTimings *servenv.TimingsWrapper
vstreamerEventsStreamed *stats.Counter
vstreamerPacketSize *stats.GaugeFunc
vstreamerNumPackets *stats.Counter
resultStreamerNumRows *stats.Counter
resultStreamerNumPackets *stats.Counter
rowStreamerNumRows *stats.Counter
rowStreamerNumPackets *stats.Counter
errorCounts *stats.CountersWithSingleLabel
vstreamersCreated *stats.Counter
vstreamersEndedWithErrors *stats.Counter
}

// NewEngine creates a new Engine.
Expand All @@ -102,14 +105,17 @@ func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, cell str
vschemaErrors: env.Exporter().NewCounter("VSchemaErrors", "Count of VSchema errors"),
vschemaUpdates: env.Exporter().NewCounter("VSchemaUpdates", "Count of VSchema updates. Does not include errors"),

vstreamerPhaseTimings: env.Exporter().NewTimings("VStreamerPhaseTiming", "Time taken for different phases during vstream copy", "phase-timing"),
vstreamerEventsStreamed: env.Exporter().NewCounter("VStreamerEventsStreamed", "Count of events streamed in VStream API"),
vstreamerPacketSize: env.Exporter().NewGaugeFunc("VStreamPacketSize", "Max packet size for sending vstreamer events", getPacketSize),
vstreamerNumPackets: env.Exporter().NewCounter("VStreamerNumPackets", "Number of packets in vstreamer"),
resultStreamerNumPackets: env.Exporter().NewCounter("ResultStreamerNumPackets", "Number of packets in result streamer"),
resultStreamerNumRows: env.Exporter().NewCounter("ResultStreamerNumRows", "Number of rows sent in result streamer"),
rowStreamerNumPackets: env.Exporter().NewCounter("RowStreamerNumPackets", "Number of packets in row streamer"),
rowStreamerNumRows: env.Exporter().NewCounter("RowStreamerNumRows", "Number of rows sent in row streamer"),
vstreamerPhaseTimings: env.Exporter().NewTimings("VStreamerPhaseTiming", "Time taken for different phases during vstream copy", "phase-timing"),
vstreamerEventsStreamed: env.Exporter().NewCounter("VStreamerEventsStreamed", "Count of events streamed in VStream API"),
vstreamerPacketSize: env.Exporter().NewGaugeFunc("VStreamPacketSize", "Max packet size for sending vstreamer events", getPacketSize),
vstreamerNumPackets: env.Exporter().NewCounter("VStreamerNumPackets", "Number of packets in vstreamer"),
resultStreamerNumPackets: env.Exporter().NewCounter("ResultStreamerNumPackets", "Number of packets in result streamer"),
resultStreamerNumRows: env.Exporter().NewCounter("ResultStreamerNumRows", "Number of rows sent in result streamer"),
rowStreamerNumPackets: env.Exporter().NewCounter("RowStreamerNumPackets", "Number of packets in row streamer"),
rowStreamerNumRows: env.Exporter().NewCounter("RowStreamerNumRows", "Number of rows sent in row streamer"),
vstreamersCreated: env.Exporter().NewCounter("VStreamersCreated", "Count of vstreamers created"),
vstreamersEndedWithErrors: env.Exporter().NewCounter("VStreamersEndedWithErrors", "Count of vstreamers that ended with errors"),
errorCounts: env.Exporter().NewCountersWithSingleLabel("VStreamerErrors", "Tracks errors in vstreamer", "type", "Catchup", "Copy", "Send", "TablePlan"),
}
env.Exporter().HandleFunc("/debug/tablet_vschema", vse.ServeHTTP)
return vse
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ func (uvs *uvstreamer) send2(evs []*binlogdatapb.VEvent) error {
}
}
}
if err != nil {
uvs.vse.errorCounts.Add("Send", 1)
}
return err
}

Expand All @@ -298,7 +301,7 @@ func (uvs *uvstreamer) sendEventsForCurrentPos() error {
Type: binlogdatapb.VEventType_OTHER,
}}
if err := uvs.send(evs); err != nil {
return wrapError(err, uvs.pos)
return wrapError(err, uvs.pos, uvs.vse)
}
return nil
}
Expand All @@ -320,6 +323,7 @@ func (uvs *uvstreamer) setStreamStartPosition() error {
return vterrors.Wrap(err, "could not decode position")
}
if !curPos.AtLeast(pos) {
uvs.vse.errorCounts.Add("GTIDSet Mismatch", 1)
return fmt.Errorf("GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos))
}
uvs.pos = pos
Expand Down Expand Up @@ -362,6 +366,7 @@ func (uvs *uvstreamer) Stream() error {
log.Info("TablePKs is not nil: starting vs.copy()")
if err := uvs.copy(uvs.ctx); err != nil {
log.Infof("uvstreamer.Stream() copy returned with err %s", err)
uvs.vse.errorCounts.Add("Copy", 1)
return err
}
uvs.sendTestEvent("Copy Done")
Expand Down
2 changes: 0 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,6 @@ func resetMetrics(t *testing.T) {
engine.resultStreamerNumRows.Reset()
engine.rowStreamerNumRows.Reset()
engine.vstreamerPhaseTimings.Reset()
engine.vstreamerPhaseTimings.Reset()
engine.vstreamerPhaseTimings.Reset()
}

func validateMetrics(t *testing.T) {
Expand Down
21 changes: 15 additions & 6 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,12 @@ func (vs *vstreamer) Stream() error {
//defer vs.cancel()
ctx := context.Background()
defer ctx.Done()
vs.vse.vstreamersCreated.Add(1)
log.Infof("Starting Stream() with startPos %s", vs.startPos)
pos, err := mysql.DecodePosition(vs.startPos)
if err != nil {
vs.vse.errorCounts.Add("StreamRows", 1)
vs.vse.vstreamersEndedWithErrors.Add(1)
return err
}
vs.pos = pos
Expand All @@ -161,21 +164,21 @@ func (vs *vstreamer) replicate(ctx context.Context) error {
// Ensure se is Open. If vttablet came up in a non_serving role,
// the schema engine may not have been initialized.
if err := vs.se.Open(); err != nil {
return wrapError(err, vs.pos)
return wrapError(err, vs.pos, vs.vse)
}

conn, err := binlog.NewBinlogConnection(vs.cp)
if err != nil {
return wrapError(err, vs.pos)
return wrapError(err, vs.pos, vs.vse)
}
defer conn.Close()

events, err := conn.StartBinlogDumpFromPosition(vs.ctx, vs.pos)
if err != nil {
return wrapError(err, vs.pos)
return wrapError(err, vs.pos, vs.vse)
}
err = vs.parseEvents(vs.ctx, events)
return wrapError(err, vs.pos)
return wrapError(err, vs.pos, vs.vse)
}

// parseEvents parses and sends events.
Expand Down Expand Up @@ -252,6 +255,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
case binlogdatapb.VEventType_SAVEPOINT:
bufferedEvents = append(bufferedEvents, vevent)
default:
vs.vse.errorCounts.Add("BufferAndTransmit", 1)
return fmt.Errorf("unexpected event: %v", vevent)
}
return nil
Expand Down Expand Up @@ -280,13 +284,15 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
vevents, err := vs.parseEvent(ev)
if err != nil {
vs.vse.errorCounts.Add("ParseEvent", 1)
return err
}
for _, vevent := range vevents {
if err := bufferAndTransmit(vevent); err != nil {
if err == io.EOF {
return nil
}
vs.vse.errorCounts.Add("BufferAndTransmit", 1)
return fmt.Errorf("error sending event: %v", err)
}
}
Expand All @@ -308,6 +314,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
if err == io.EOF {
return nil
}
vs.vse.errorCounts.Add("Send", 1)
return fmt.Errorf("error sending event: %v", err)
}
}
Expand Down Expand Up @@ -503,6 +510,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e

vevent, err := vs.buildTablePlan(id, tm)
if err != nil {
vs.vse.errorCounts.Add("TablePlan", 1)
return nil, err
}
if vevent != nil {
Expand Down Expand Up @@ -538,7 +546,6 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
if err != nil {
return nil, err
}

}
for _, vevent := range vevents {
vevent.Timestamp = int64(ev.Timestamp())
Expand Down Expand Up @@ -807,8 +814,10 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo
return plan.filter(values)
}

func wrapError(err error, stopPos mysql.Position) error {
func wrapError(err error, stopPos mysql.Position, vse *Engine) error {
if err != nil {
vse.vstreamersEndedWithErrors.Add(1)
vse.errorCounts.Add("StreamEnded", 1)
err = fmt.Errorf("stream (at source tablet) error @ %v: %v", stopPos, err)
log.Error(err)
return err
Expand Down

0 comments on commit 7c0cf0f

Please sign in to comment.