Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug Report: VReplication lag is not updated when vplayer is throttled #16575

Closed
mattlord opened this issue Aug 9, 2024 · 0 comments · Fixed by #16577
Closed

Bug Report: VReplication lag is not updated when vplayer is throttled #16575

mattlord opened this issue Aug 9, 2024 · 0 comments · Fixed by #16577
Assignees
Labels
Component: Observability Pull requests that touch tracing/metrics/monitoring Component: VReplication Type: Bug

Comments

@mattlord
Copy link
Contributor

mattlord commented Aug 9, 2024

Overview of the Issue

There is a for loop in the vplayer — which applies streamed binlog events from the vstreamer — where we process events and as we do we update the vreplication lag:

for {
if ctx.Err() != nil {
return ctx.Err()
}
// Check throttler.
if checkResult, ok := vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)); !ok {
_ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName, checkResult.Summary())
continue
}
items, err := relay.Fetch()
if err != nil {
return err
}
// No events were received. This likely means that there's a network partition.
// So, we should assume we're falling behind.
if len(items) == 0 {
behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9)
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(behind/1e9)*time.Second)
}
// Empty transactions are saved at most once every idleTimeout.
// This covers two situations:
// 1. Fetch was idle for idleTimeout.
// 2. We've been receiving empty events for longer than idleTimeout.
// In both cases, now > timeLastSaved. If so, the GTID of the last unsavedEvent
// must be saved.
if time.Since(vp.timeLastSaved) >= idleTimeout && vp.unsavedEvent != nil {
posReached, err := vp.updatePos(ctx, vp.unsavedEvent.Timestamp)
if err != nil {
return err
}
if posReached {
// Unreachable.
return nil
}
}
for i, events := range items {
for j, event := range events {
if event.Timestamp != 0 {
vp.lastTimestampNs = event.Timestamp * 1e9
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
sbm = event.CurrentTime/1e9 - event.Timestamp
}
mustSave := false
switch event.Type {
case binlogdatapb.VEventType_COMMIT:
// If we've reached the stop position, we must save the current commit
// even if it's empty. So, the next applyEvent is invoked with the
// mustSave flag.
if !vp.stopPos.IsZero() && vp.pos.AtLeast(vp.stopPos) {
mustSave = true
break
}
// In order to group multiple commits into a single one, we look ahead for
// the next commit. If there is one, we skip the current commit, which ends up
// applying the next set of events as part of the current transaction. This approach
// also handles the case where the last transaction is partial. In that case,
// we only group the transactions with commits we've seen so far.
if hasAnotherCommit(items, i, j+1) {
continue
}
}
if err := vp.applyEvent(ctx, event, mustSave); err != nil {
if err != io.EOF {
vp.vr.stats.ErrorCounts.Add([]string{"Apply"}, 1)
var table, tableLogMsg string
switch {
case event.GetFieldEvent() != nil:
table = event.GetFieldEvent().TableName
case event.GetRowEvent() != nil:
table = event.GetRowEvent().TableName
}
if table != "" {
tableLogMsg = fmt.Sprintf(" for table %s", table)
}
log.Errorf("Error applying event%s: %s", tableLogMsg, err.Error())
}
return err
}
}
}
if sbm >= 0 {
vp.vr.stats.ReplicationLagSeconds.Store(sbm)
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(sbm)*time.Second)
}
}

If the vplayer is throttled for some time, however, then we are stuck at the top of that for loop and never make it to the bottom of it where we update the lag value based on the just processed events:

for {
if ctx.Err() != nil {
return ctx.Err()
}
// Check throttler.
if checkResult, ok := vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)); !ok {
_ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName, checkResult.Summary())
continue
}

Because we're not processing events for however long we're fully throttled, which is indefinite, we're not updating the vreplication lag. Let's say that the last time we did process an event the lag was 0 seconds. And let's say we're then fully throttled, and not able to process anymore events, for the next 15 minutes... the system and operator is not aware of the impending and growing vreplication lag and suddenly the value shoots up from 0 seconds to 900 seconds.

This is obviously wrong. It can lead to only becoming aware of the issue once it's a bigger problem — if made aware immediately you may want to explicitly lessen the throttling altogether or for vreplication or more specifically the vplayer — or cause unnecessary concern as the lag unexpectedly fluctuates wildly (perhaps you really do want vreplication to be deferred/throttled).

We currently have code in place which estimates the vreplication lag when we're not receiving any events from the vstreamer (perhaps we're not able to communicate or perhaps the sender/vstreamer is throttled):

// No events were received. This likely means that there's a network partition.
// So, we should assume we're falling behind.
if len(items) == 0 {
behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9)
vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(behind/1e9)*time.Second)
}

We also need to do that when we're throttled. It may be as simple as this:

diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go
index 31e26c30e8..0444038924 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go
@@ -476,6 +476,12 @@ func (vp *vplayer) recordHeartbeat() error {
 func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
        defer vp.vr.dbClient.Rollback()

+       estimateLag := func() {
+               behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
+               vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9)
+               vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(behind/1e9)*time.Second)
+       }
+
        // If we're not running, set ReplicationLagSeconds to be very high.
        // TODO(sougou): if we also stored the time of the last event, we
        // can estimate this value more accurately.
@@ -489,6 +495,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
                // Check throttler.
                if checkResult, ok := vp.vr.vre.throttlerClient.ThrottleCheckOKOrWaitAppName(ctx, throttlerapp.Name(vp.throttlerAppName)); !ok {
                        _ = vp.vr.updateTimeThrottled(throttlerapp.VPlayerName, checkResult.Summary())
+                       estimateLag()
                        continue
                }

@@ -499,9 +506,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
                // No events were received. This likely means that there's a network partition.
                // So, we should assume we're falling behind.
                if len(items) == 0 {
-                       behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
-                       vp.vr.stats.ReplicationLagSeconds.Store(behind / 1e9)
-                       vp.vr.stats.VReplicationLags.Add(strconv.Itoa(int(vp.vr.id)), time.Duration(behind/1e9)*time.Second)
+                       estimateLag()
                }
                // Empty transactions are saved at most once every idleTimeout.
                // This covers two situations:

Reproduction Steps

git checkout main

make build

cd examples/local

./101_initial_cluster.sh; ./201_customer_tablets.sh; ./202_move_tables.sh

vtctldclient UpdateThrottlerConfig --enable --throttle-app="all" --throttle-app-ratio 1 --throttle-app-duration 4h customer

sleep 10

primaryuid=$(vtctldclient GetTablets --keyspace customer --tablet-type primary --shard "0" | awk '{print $1}' | cut -d- -f2 | bc)

mysql < ../common/insert_commerce_data.sql

sleep 5

vtctldclient MoveTables --target-keyspace customer --workflow commerce2customer show --compact --include-logs=false | jq ".workflows[0].shard_streams.\"0/zone1-0000000${primaryuid}\".streams[0].throttler_status"

sleep 5

curl -s "http://localhost:15${primaryuid}/debug/vars" | jq '.VReplicationLagSeconds'

End result on main:

{
  "component_throttled": "vplayer",
  "time_throttled": {
    "seconds": "1723310863"
  }
}
{
  "commerce.0.commerce2customer.1": 0
}

End result with the proposed patch:

{
  "component_throttled": "vplayer",
  "time_throttled": {
    "seconds": "1723310579"
  }
}
{
  "commerce.0.commerce2customer.1": 20
}

Binary Version

vtgate version Version: 21.0.0-SNAPSHOT (Git revision 3cfb08c45ec995f347b95cb91a56b36a3c5b6b56 branch 'ws_logger') built on Thu Aug  8 23:36:29 EDT 2024 by matt@pslord.local using go1.22.5 darwin/arm64

Operating System and Environment details

N/A

Log Fragments

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: Observability Pull requests that touch tracing/metrics/monitoring Component: VReplication Type: Bug
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

1 participant